[jira] [Updated] (KAFKA-1670) Corrupt log files for segment.bytes values close to Int.MaxInt

2014-10-12 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1670:
---
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Thanks for the followup patch for system tests. +1 and committed to trunk and 
0.8.2.

Yes, it's possible for the broker to break the message set into smaller chunks 
that fit in the configured log segment size. However, there may be some benefit 
to keep the original message set as a whole. For example, if the message set is 
compressed, this guarantees that either all or none of the messages in the set 
are replicated to the followers.

 Corrupt log files for segment.bytes values close to Int.MaxInt
 --

 Key: KAFKA-1670
 URL: https://issues.apache.org/jira/browse/KAFKA-1670
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Ryan Berdeen
Assignee: Sriharsha Chintalapani
Priority: Blocker
 Fix For: 0.8.2

 Attachments: KAFKA-1670.patch, KAFKA-1670.patch, 
 KAFKA-1670_2014-10-04_20:17:46.patch, KAFKA-1670_2014-10-06_09:48:25.patch, 
 KAFKA-1670_2014-10-07_13:39:13.patch, KAFKA-1670_2014-10-07_13:49:10.patch, 
 KAFKA-1670_2014-10-07_18:39:31.patch


 The maximum value for the topic-level config {{segment.bytes}} is 
 {{Int.MaxInt}} (2147483647). *Using this value causes brokers to corrupt 
 their log files, leaving them unreadable.*
 We set {{segment.bytes}} to {{2122317824}} which is well below the maximum. 
 One by one, the ISR of all partitions shrunk to 1. Brokers would crash when 
 restarted, attempting to read from a negative offset in a log file. After 
 discovering that many segment files had grown to 4GB or more, we were forced 
 to shut down our *entire production Kafka cluster* for several hours while we 
 split all segment files into 1GB chunks.
 Looking into the {{kafka.log}} code, the {{segment.bytes}} parameter is used 
 inconsistently. It is treated as a *soft* maximum for the size of the segment 
 file 
 (https://github.com/apache/kafka/blob/0.8.1.1/core/src/main/scala/kafka/log/LogConfig.scala#L26)
  with logs rolled only after 
 (https://github.com/apache/kafka/blob/0.8.1.1/core/src/main/scala/kafka/log/Log.scala#L246)
  they exceed this value. However, much of the code that deals with log files 
 uses *ints* to store the size of the file and the position in the file. 
 Overflow of these ints leads the broker to append to the segments 
 indefinitely, and to fail to read these segments for consuming or recovery.
 This is trivial to reproduce:
 {code}
 $ bin/kafka-topics.sh --topic segment-bytes-test --create 
 --replication-factor 2 --partitions 1 --zookeeper zkhost:2181
 $ bin/kafka-topics.sh --topic segment-bytes-test --alter --config 
 segment.bytes=2147483647 --zookeeper zkhost:2181
 $ yes Int.MaxValue is a ridiculous bound on file size in 2014 | 
 bin/kafka-console-producer.sh --broker-list localhost:6667 zkhost:2181 
 --topic segment-bytes-test
 {code}
 After running for a few minutes, the log file is corrupt:
 {code}
 $ ls -lh data/segment-bytes-test-0/
 total 9.7G
 -rw-r--r-- 1 root root  10M Oct  3 19:39 .index
 -rw-r--r-- 1 root root 9.7G Oct  3 19:39 .log
 {code}
 We recovered the data from the log files using a simple Python script: 
 https://gist.github.com/also/9f823d9eb9dc0a410796



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-10-12 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168692#comment-14168692
 ] 

Neha Narkhede commented on KAFKA-1555:
--

bq. My vote would be to update our documentation - 
http://kafka.apache.org/documentation.html

+1. Though it will be less confusing if we wait for 0.8.2 to be released.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, 
 KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, 
 KAFKA-1555.8.patch, KAFKA-1555.9.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1634) Improve semantics of timestamp in OffsetCommitRequests and update documentation

2014-10-12 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168696#comment-14168696
 ] 

Jun Rao commented on KAFKA-1634:


Joel,

I don't see OffsetFetchResponse contain timestamp (it only contains offset, 
metadata and error code). I agree that it's better to move retentionMs to the 
topic-level of OffsetCommitRequest. We can bump up the protocol version if 
needed.

 Improve semantics of timestamp in OffsetCommitRequests and update 
 documentation
 ---

 Key: KAFKA-1634
 URL: https://issues.apache.org/jira/browse/KAFKA-1634
 Project: Kafka
  Issue Type: Bug
Reporter: Neha Narkhede
Assignee: Joel Koshy
Priority: Blocker
 Fix For: 0.8.2


 From the mailing list -
 following up on this -- I think the online API docs for OffsetCommitRequest
 still incorrectly refer to client-side timestamps:
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest
 Wasn't that removed and now always handled server-side now?  Would one of
 the devs mind updating the API spec wiki?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1634) Improve semantics of timestamp in OffsetCommitRequests and update documentation

2014-10-12 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168802#comment-14168802
 ] 

Neha Narkhede commented on KAFKA-1634:
--

+1 to your suggestions above. Thanks [~jjkoshy]

 Improve semantics of timestamp in OffsetCommitRequests and update 
 documentation
 ---

 Key: KAFKA-1634
 URL: https://issues.apache.org/jira/browse/KAFKA-1634
 Project: Kafka
  Issue Type: Bug
Reporter: Neha Narkhede
Assignee: Joel Koshy
Priority: Blocker
 Fix For: 0.8.2


 From the mailing list -
 following up on this -- I think the online API docs for OffsetCommitRequest
 still incorrectly refer to client-side timestamps:
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest
 Wasn't that removed and now always handled server-side now?  Would one of
 the devs mind updating the API spec wiki?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1701) Improve controller and broker message handling.

2014-10-12 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1701:
-
Reviewer: Neha Narkhede

 Improve controller and broker message handling.
 ---

 Key: KAFKA-1701
 URL: https://issues.apache.org/jira/browse/KAFKA-1701
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin

 This ticket is a memo for future controller refactoring.
 It is related to KAFKA-1547. Ideally, the broker should only follow 
 instruction from controller but not handle it smartly. For KAFKA-1547, the 
 controller should filter out the partitions whose leader is not up yet before 
 send LeaderAndIsrRequest to broker. 
 The idea is controller should handle all the edge cases instead of letting 
 broker do it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1680) JmxTool exits if no arguments are given

2014-10-12 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1680:
-
Labels: newbie  (was: )

 JmxTool exits if no arguments are given
 ---

 Key: KAFKA-1680
 URL: https://issues.apache.org/jira/browse/KAFKA-1680
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2
Reporter: Ryan Berdeen
Assignee: Ewen Cheslack-Postava
Priority: Minor
  Labels: newbie
 Attachments: KAFKA-1680.patch


 JmxTool has no required arguments, but it exits if no arguments are provided. 
 You can work around this by passing a non-option argument, which will be 
 ignored, e.g.{{./bin/kafka-run-class.sh kafka.tools.JmxTool xxx}}.
 It looks like this was broken in KAFKA-1291 / 
 6b0ae4bba0d0f8e4c8da19de65a8f03f162bec39



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled

2014-10-12 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168819#comment-14168819
 ] 

Neha Narkhede commented on KAFKA-1305:
--

Increasing the queue size by a little doesn't really solve the problem. We 
should conduct more tests on an unbounded controller queue, if we have any 
doubt whether or not it will work. [~sriharsha] I will help you review changes 
to the controller, if you are up for updating your patch.

 Controller can hang on controlled shutdown with auto leader balance enabled
 ---

 Key: KAFKA-1305
 URL: https://issues.apache.org/jira/browse/KAFKA-1305
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
Priority: Blocker
 Fix For: 0.8.2, 0.9.0

 Attachments: KAFKA-1305.patch


 This is relatively easy to reproduce especially when doing a rolling bounce.
 What happened here is as follows:
 1. The previous controller was bounced and broker 265 became the new 
 controller.
 2. I went on to do a controlled shutdown of broker 265 (the new controller).
 3. In the mean time the automatically scheduled preferred replica leader 
 election process started doing its thing and starts sending 
 LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers).  
 (t@113 below).
 4. While that's happening, the controlled shutdown process on 265 succeeds 
 and proceeds to deregister itself from ZooKeeper and shuts down the socket 
 server.
 5. (ReplicaStateMachine actually removes deregistered brokers from the 
 controller channel manager's list of brokers to send requests to.  However, 
 that removal cannot take place (t@18 below) because preferred replica leader 
 election task owns the controller lock.)
 6. So the request thread to broker 265 gets into infinite retries.
 7. The entire broker shutdown process is blocked on controller shutdown for 
 the same reason (it needs to acquire the controller lock).
 Relevant portions from the thread-dump:
 Controller-265-to-broker-265-send-thread - Thread t@113
java.lang.Thread.State: TIMED_WAITING
   at java.lang.Thread.sleep(Native Method)
   at 
 kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143)
   at kafka.utils.Utils$.swallow(Utils.scala:167)
   at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
   at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
   at kafka.utils.Logging$class.swallow(Logging.scala:94)
   at kafka.utils.Utils$.swallow(Utils.scala:46)
   at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143)
   at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
   - locked java.lang.Object@6dbf14a7
   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
Locked ownable synchronizers:
   - None
 ...
 Thread-4 - Thread t@17
java.lang.Thread.State: WAITING on 
 java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: 
 kafka-scheduler-0
   at sun.misc.Unsafe.park(Native Method)
   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
   at 
 java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
   at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
   at kafka.utils.Utils$.inLock(Utils.scala:536)
   at kafka.controller.KafkaController.shutdown(KafkaController.scala:642)
   at 
 kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242)
   at kafka.utils.Utils$.swallow(Utils.scala:167)
   at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
   at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
   at kafka.utils.Logging$class.swallow(Logging.scala:94)
   at kafka.utils.Utils$.swallow(Utils.scala:46)
   at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242)
   at 
 kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46)
   at kafka.Kafka$$anon$1.run(Kafka.scala:42)
 ...
 kafka-scheduler-0 - Thread t@117
java.lang.Thread.State: WAITING on 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc
   at sun.misc.Unsafe.park(Native Method)
   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
   at 
 

Re: Review Request 26564: Patch for KAFKA-1471

2014-10-12 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26564/#review56365
---


Thanks for updating the patch, Ewen. 2 minor comments. Probably worth updating 
PerfConfig as well as MessageTest

- Neha Narkhede


On Oct. 10, 2014, 5:43 p.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26564/
 ---
 
 (Updated Oct. 10, 2014, 5:43 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1471
 https://issues.apache.org/jira/browse/KAFKA-1471
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1471 Add producer unit tests for LZ4 and LZ4HC compression codecs
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 79d57f9bf31606ffa5400f2f12356eba84703cc2 
   core/src/main/scala/kafka/tools/ConsoleProducer.scala 
 8e9ba0b284671989f87d9c421bc98f5c4384c260 
   core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
 17e2c6e9dfd789acb4b6db37c780c862667e4e11 
 
 Diff: https://reviews.apache.org/r/26564/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




Re: Review Request 26563: Patch for KAFKA-1692

2014-10-12 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26563/#review56366
---

Ship it!


Ship It!

- Neha Narkhede


On Oct. 10, 2014, 5:38 p.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26563/
 ---
 
 (Updated Oct. 10, 2014, 5:38 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1692
 https://issues.apache.org/jira/browse/KAFKA-1692
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1692 Include client ID in new producer IO thread name.
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 f58b8508d3f813a51015abed772c704390887d7e 
 
 Diff: https://reviews.apache.org/r/26563/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




[jira] [Updated] (KAFKA-1692) [Java New Producer] IO Thread Name Must include Client ID

2014-10-12 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1692:
-
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Thanks for the patch. Pushed to trunk

 [Java New Producer]  IO Thread Name Must include  Client ID
 ---

 Key: KAFKA-1692
 URL: https://issues.apache.org/jira/browse/KAFKA-1692
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8.2
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
Priority: Trivial
  Labels: newbie
 Attachments: KAFKA-1692.patch


 Please add client id so people who are looking at Jconsole or Profile tool 
 can see Thread by client id since single JVM can have multiple producer 
 instance.  
 org.apache.kafka.clients.producer.KafkaProducer
 {code}
 String ioThreadName = kafka-producer-network-thread;
  if(clientId != null){
   ioThreadName = ioThreadName  +  | +clientId; 
 }
 this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1692) [Java New Producer] IO Thread Name Must include Client ID

2014-10-12 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168831#comment-14168831
 ] 

Neha Narkhede edited comment on KAFKA-1692 at 10/12/14 11:00 PM:
-

Thanks for the patch. Pushed to trunk and 0.8.2


was (Author: nehanarkhede):
Thanks for the patch. Pushed to trunk

 [Java New Producer]  IO Thread Name Must include  Client ID
 ---

 Key: KAFKA-1692
 URL: https://issues.apache.org/jira/browse/KAFKA-1692
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8.2
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
Priority: Trivial
  Labels: newbie
 Attachments: KAFKA-1692.patch


 Please add client id so people who are looking at Jconsole or Profile tool 
 can see Thread by client id since single JVM can have multiple producer 
 instance.  
 org.apache.kafka.clients.producer.KafkaProducer
 {code}
 String ioThreadName = kafka-producer-network-thread;
  if(clientId != null){
   ioThreadName = ioThreadName  +  | +clientId; 
 }
 this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1471) Add Producer Unit Tests for LZ4 and LZ4HC compression

2014-10-12 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168834#comment-14168834
 ] 

Ewen Cheslack-Postava commented on KAFKA-1471:
--

Updated reviewboard https://reviews.apache.org/r/26564/diff/
 against branch origin/trunk

 Add Producer Unit Tests for LZ4 and LZ4HC compression
 -

 Key: KAFKA-1471
 URL: https://issues.apache.org/jira/browse/KAFKA-1471
 Project: Kafka
  Issue Type: Sub-task
Reporter: James Oliver
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.2

 Attachments: KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471.patch, 
 KAFKA-1471_2014-10-12_16:02:19.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26564: Patch for KAFKA-1471

2014-10-12 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26564/
---

(Updated Oct. 12, 2014, 11:02 p.m.)


Review request for kafka.


Bugs: KAFKA-1471
https://issues.apache.org/jira/browse/KAFKA-1471


Repository: kafka


Description (updated)
---

KAFKA-1471 Add producer unit tests for LZ4 and LZ4HC compression codecs; 
patched by James Oliver


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
79d57f9bf31606ffa5400f2f12356eba84703cc2 
  core/src/main/scala/kafka/tools/ConsoleProducer.scala 
8e9ba0b284671989f87d9c421bc98f5c4384c260 
  core/src/main/scala/kafka/tools/PerfConfig.scala 
129cc013f68d2b89bdfea74a1d9ee26a011791f2 
  core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
17e2c6e9dfd789acb4b6db37c780c862667e4e11 
  core/src/test/scala/unit/kafka/message/MessageTest.scala 
4837585d03535043a6f25938368988128df9b94a 

Diff: https://reviews.apache.org/r/26564/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



[jira] [Updated] (KAFKA-1471) Add Producer Unit Tests for LZ4 and LZ4HC compression

2014-10-12 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1471:
-
Attachment: KAFKA-1471_2014-10-12_16:02:19.patch

 Add Producer Unit Tests for LZ4 and LZ4HC compression
 -

 Key: KAFKA-1471
 URL: https://issues.apache.org/jira/browse/KAFKA-1471
 Project: Kafka
  Issue Type: Sub-task
Reporter: James Oliver
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.2

 Attachments: KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471.patch, 
 KAFKA-1471_2014-10-12_16:02:19.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: Kafka-trunk #298

2014-10-12 Thread Apache Jenkins Server
See https://builds.apache.org/job/Kafka-trunk/298/changes

Changes:

[neha.narkhede] KAFKA-1692 Include client ID in new producer IO thread name; 
reviewed by Neha Narkhede

--
[...truncated 2018 lines...]

kafka.admin.AdminTest  testPartitionReassignmentNonOverlappingReplicas FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at kafka.admin.AdminTest.setUp(AdminTest.scala:33)

kafka.admin.AdminTest  testReassigningNonExistingPartition FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at kafka.admin.AdminTest.setUp(AdminTest.scala:33)

kafka.admin.AdminTest  testResumePartitionReassignmentThatWasCompleted FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at kafka.admin.AdminTest.setUp(AdminTest.scala:33)

kafka.admin.AdminTest  testPreferredReplicaJsonData FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at kafka.admin.AdminTest.setUp(AdminTest.scala:33)

kafka.admin.AdminTest  testBasicPreferredReplicaElection FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at kafka.admin.AdminTest.setUp(AdminTest.scala:33)

kafka.admin.AdminTest  testShutdownBroker FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at kafka.admin.AdminTest.setUp(AdminTest.scala:33)

kafka.admin.AdminTest  testTopicConfigChange FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 

Re: Review Request 26564: Patch for KAFKA-1471

2014-10-12 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26564/#review56368
---

Ship it!


Ship It!

- Neha Narkhede


On Oct. 12, 2014, 11:02 p.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26564/
 ---
 
 (Updated Oct. 12, 2014, 11:02 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1471
 https://issues.apache.org/jira/browse/KAFKA-1471
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1471 Add producer unit tests for LZ4 and LZ4HC compression codecs; 
 patched by James Oliver
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 79d57f9bf31606ffa5400f2f12356eba84703cc2 
   core/src/main/scala/kafka/tools/ConsoleProducer.scala 
 8e9ba0b284671989f87d9c421bc98f5c4384c260 
   core/src/main/scala/kafka/tools/PerfConfig.scala 
 129cc013f68d2b89bdfea74a1d9ee26a011791f2 
   core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
 17e2c6e9dfd789acb4b6db37c780c862667e4e11 
   core/src/test/scala/unit/kafka/message/MessageTest.scala 
 4837585d03535043a6f25938368988128df9b94a 
 
 Diff: https://reviews.apache.org/r/26564/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




[jira] [Commented] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled

2014-10-12 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168838#comment-14168838
 ] 

Sriharsha Chintalapani commented on KAFKA-1305:
---

[~nehanarkhede] so the changes you are looking for are remove the config option 
and make the LinkedBlockingQueue to unbounded?

 Controller can hang on controlled shutdown with auto leader balance enabled
 ---

 Key: KAFKA-1305
 URL: https://issues.apache.org/jira/browse/KAFKA-1305
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
Priority: Blocker
 Fix For: 0.8.2, 0.9.0

 Attachments: KAFKA-1305.patch


 This is relatively easy to reproduce especially when doing a rolling bounce.
 What happened here is as follows:
 1. The previous controller was bounced and broker 265 became the new 
 controller.
 2. I went on to do a controlled shutdown of broker 265 (the new controller).
 3. In the mean time the automatically scheduled preferred replica leader 
 election process started doing its thing and starts sending 
 LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers).  
 (t@113 below).
 4. While that's happening, the controlled shutdown process on 265 succeeds 
 and proceeds to deregister itself from ZooKeeper and shuts down the socket 
 server.
 5. (ReplicaStateMachine actually removes deregistered brokers from the 
 controller channel manager's list of brokers to send requests to.  However, 
 that removal cannot take place (t@18 below) because preferred replica leader 
 election task owns the controller lock.)
 6. So the request thread to broker 265 gets into infinite retries.
 7. The entire broker shutdown process is blocked on controller shutdown for 
 the same reason (it needs to acquire the controller lock).
 Relevant portions from the thread-dump:
 Controller-265-to-broker-265-send-thread - Thread t@113
java.lang.Thread.State: TIMED_WAITING
   at java.lang.Thread.sleep(Native Method)
   at 
 kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143)
   at kafka.utils.Utils$.swallow(Utils.scala:167)
   at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
   at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
   at kafka.utils.Logging$class.swallow(Logging.scala:94)
   at kafka.utils.Utils$.swallow(Utils.scala:46)
   at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143)
   at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
   - locked java.lang.Object@6dbf14a7
   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
Locked ownable synchronizers:
   - None
 ...
 Thread-4 - Thread t@17
java.lang.Thread.State: WAITING on 
 java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: 
 kafka-scheduler-0
   at sun.misc.Unsafe.park(Native Method)
   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
   at 
 java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
   at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
   at kafka.utils.Utils$.inLock(Utils.scala:536)
   at kafka.controller.KafkaController.shutdown(KafkaController.scala:642)
   at 
 kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242)
   at kafka.utils.Utils$.swallow(Utils.scala:167)
   at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
   at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
   at kafka.utils.Logging$class.swallow(Logging.scala:94)
   at kafka.utils.Utils$.swallow(Utils.scala:46)
   at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242)
   at 
 kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46)
   at kafka.Kafka$$anon$1.run(Kafka.scala:42)
 ...
 kafka-scheduler-0 - Thread t@117
java.lang.Thread.State: WAITING on 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc
   at sun.misc.Unsafe.park(Native Method)
   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
   at 
 

Re: Review Request 26564: Patch for KAFKA-1471

2014-10-12 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26564/#review56369
---

Ship it!


Ship It!

- Neha Narkhede


On Oct. 12, 2014, 11:02 p.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26564/
 ---
 
 (Updated Oct. 12, 2014, 11:02 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1471
 https://issues.apache.org/jira/browse/KAFKA-1471
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1471 Add producer unit tests for LZ4 and LZ4HC compression codecs; 
 patched by James Oliver
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 79d57f9bf31606ffa5400f2f12356eba84703cc2 
   core/src/main/scala/kafka/tools/ConsoleProducer.scala 
 8e9ba0b284671989f87d9c421bc98f5c4384c260 
   core/src/main/scala/kafka/tools/PerfConfig.scala 
 129cc013f68d2b89bdfea74a1d9ee26a011791f2 
   core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
 17e2c6e9dfd789acb4b6db37c780c862667e4e11 
   core/src/test/scala/unit/kafka/message/MessageTest.scala 
 4837585d03535043a6f25938368988128df9b94a 
 
 Diff: https://reviews.apache.org/r/26564/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




[jira] [Created] (KAFKA-1702) Messages silently Lost by producer

2014-10-12 Thread Alexis Midon (JIRA)
Alexis Midon created KAFKA-1702:
---

 Summary: Messages silently Lost by producer
 Key: KAFKA-1702
 URL: https://issues.apache.org/jira/browse/KAFKA-1702
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.1.1
Reporter: Alexis Midon
Assignee: Jun Rao



Hello,

we lost millions of messages because of this {{try/catch}} in  the producer 
{{DefaultEventHandler}}:
https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala#L114-L116

If a Throwable is caught by this {{try/catch}}, the retry policy will have no 
effect and all yet-to-be-sent messages are lost (the error will break the loop 
over the broker list).
This issue is very hard to detect because: the producer (async or sync) cannot 
even catch the error, and *all* the metrics are updated as if everything was 
fine.

Only the abnormal drop in the producers network I/O, or the incoming message 
rate on the brokers; or the alerting on errors in producer logs could have 
revealed the issue. 

This behavior was introduced by KAFKA-300. I can't see a good reason for it, so 
here is a patch that will let the retry-policy do its job when such a 
{{Throwable}} occurs.

Thanks in advance for your help.

Alexis

ps: you might wonder how could this {{try/catch}} ever caught something? 
{{DefaultEventHandler#groupMessagesToSet}} looks so harmless. 

Here are the details:
We use Snappy compression. When the native snappy library is not installed on 
the host, Snappy, during the initialization of class 
{{org.xerial.snappy.Snappy}}  will [write a C 
library|https://github.com/xerial/snappy-java/blob/1.1.0/src/main/java/org/xerial/snappy/SnappyLoader.java#L312]
 in the JVM temp directory {{java.io.tmpdir}}.

In our scenario, {{java.io.tmpdir}} was a subdirectory of {{/tmp}}. After an 
instance reboot (thank you 
[AWS|https://twitter.com/hashtag/AWSReboot?src=hash]!), the JVM temp directory 
was removed. The JVM was then running with a non-existing temp dir. Snappy 
class would be impossible to initialize and the following message would be 
silently logged:

{code}
ERROR [2014-10-07 22:23:56,530] kafka.producer.async.DefaultEventHandler: 
Failed to send messages
! java.lang.NoClassDefFoundError: Could not initialize class 
org.xerial.snappy.Snappy
{code}





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1702) Messages silently Lost by producer

2014-10-12 Thread Alexis Midon (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexis Midon updated KAFKA-1702:

Attachment: KAFKA-1702.0.patch

 Messages silently Lost by producer
 --

 Key: KAFKA-1702
 URL: https://issues.apache.org/jira/browse/KAFKA-1702
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.1.1
Reporter: Alexis Midon
Assignee: Jun Rao
 Attachments: KAFKA-1702.0.patch


 Hello,
 we lost millions of messages because of this {{try/catch}} in  the producer 
 {{DefaultEventHandler}}:
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala#L114-L116
 If a Throwable is caught by this {{try/catch}}, the retry policy will have no 
 effect and all yet-to-be-sent messages are lost (the error will break the 
 loop over the broker list).
 This issue is very hard to detect because: the producer (async or sync) 
 cannot even catch the error, and *all* the metrics are updated as if 
 everything was fine.
 Only the abnormal drop in the producers network I/O, or the incoming message 
 rate on the brokers; or the alerting on errors in producer logs could have 
 revealed the issue. 
 This behavior was introduced by KAFKA-300. I can't see a good reason for it, 
 so here is a patch that will let the retry-policy do its job when such a 
 {{Throwable}} occurs.
 Thanks in advance for your help.
 Alexis
 ps: you might wonder how could this {{try/catch}} ever caught something? 
 {{DefaultEventHandler#groupMessagesToSet}} looks so harmless. 
 Here are the details:
 We use Snappy compression. When the native snappy library is not installed on 
 the host, Snappy, during the initialization of class 
 {{org.xerial.snappy.Snappy}}  will [write a C 
 library|https://github.com/xerial/snappy-java/blob/1.1.0/src/main/java/org/xerial/snappy/SnappyLoader.java#L312]
  in the JVM temp directory {{java.io.tmpdir}}.
 In our scenario, {{java.io.tmpdir}} was a subdirectory of {{/tmp}}. After an 
 instance reboot (thank you 
 [AWS|https://twitter.com/hashtag/AWSReboot?src=hash]!), the JVM temp 
 directory was removed. The JVM was then running with a non-existing temp dir. 
 Snappy class would be impossible to initialize and the following message 
 would be silently logged:
 {code}
 ERROR [2014-10-07 22:23:56,530] kafka.producer.async.DefaultEventHandler: 
 Failed to send messages
 ! java.lang.NoClassDefFoundError: Could not initialize class 
 org.xerial.snappy.Snappy
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (KAFKA-1702) Messages silently Lost by producer

2014-10-12 Thread Alexis Midon (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexis Midon updated KAFKA-1702:

Comment: was deleted

(was: diff --git 
core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala 
core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index d8ac915..0f7f941 100644
--- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -95,27 +95,28 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
 val partitionedDataOpt = partitionAndCollate(messages)
 partitionedDataOpt match {
   case Some(partitionedData) =
-val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
-try {
-  for ((brokerid, messagesPerBrokerMap) - partitionedData) {
-if (logger.isTraceEnabled)
-  messagesPerBrokerMap.foreach(partitionAndEvent =
-trace(Handling event for Topic: %s, Broker: %d, Partitions: 
%s.format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
-val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
-
-val failedTopicPartitions = send(brokerid, messageSetPerBroker)
-failedTopicPartitions.foreach(topicPartition = {
-  messagesPerBrokerMap.get(topicPartition) match {
-case Some(data) = failedProduceRequests.appendAll(data)
-case None = // nothing
-  }
-})
+val failedProduceRequests = new ArrayBuffer[KeyedMessage[K, Message]]
+for ((brokerid, messagesPerBrokerMap) - partitionedData) {
+  if (logger.isTraceEnabled) {
+messagesPerBrokerMap.foreach(partitionAndEvent =
+  trace(Handling event for Topic: %s, Broker: %d, Partitions: 
%s.format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
+  }
+  val messageSetPerBrokerOpt = groupMessagesToSet(messagesPerBrokerMap)
+  messageSetPerBrokerOpt match {
+case Some(messageSetPerBroker) =
+  val failedTopicPartitions = send(brokerid, messageSetPerBroker)
+  failedTopicPartitions.foreach(topicPartition = {
+messagesPerBrokerMap.get(topicPartition) match {
+  case Some(data) = failedProduceRequests.appendAll(data)
+  case None = // nothing
+}
+  })
+case None = // failed to group messages
+  messagesPerBrokerMap.values.foreach(m = 
failedProduceRequests.appendAll(m))
   }
-} catch {
-  case t: Throwable = error(Failed to send messages, t)
 }
 failedProduceRequests
-  case None = // all produce requests failed
+  case None = // failed to collate messages
 messages
 }
   }
@@ -290,43 +291,46 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
 }
   }
 
-  private def groupMessagesToSet(messagesPerTopicAndPartition: 
collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = {
+  private def groupMessagesToSet(messagesPerTopicAndPartition: 
collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K, Message]]]) = {
 /** enforce the compressed.topics config here.
-  *  If the compression codec is anything other than NoCompressionCodec,
-  *Enable compression only for specified topics if any
-  *If the list of compressed topics is empty, then enable the 
specified compression codec for all topics
-  *  If the compression codec is NoCompressionCodec, compression is 
disabled for all topics
+  * If the compression codec is anything other than NoCompressionCodec,
+  * Enable compression only for specified topics if any
+  * If the list of compressed topics is empty, then enable the specified 
compression codec for all topics
+  * If the compression codec is NoCompressionCodec, compression is 
disabled for all topics
   */
-
-val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case 
(topicAndPartition, messages) =
-  val rawMessages = messages.map(_.message)
-  ( topicAndPartition,
-config.compressionCodec match {
-  case NoCompressionCodec =
-debug(Sending %d messages with no compression to 
%s.format(messages.size, topicAndPartition))
-new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
-  case _ =
-config.compressedTopics.size match {
-  case 0 =
-debug(Sending %d messages with compression codec %d to %s
-  .format(messages.size, config.compressionCodec.codec, 
topicAndPartition))
-new ByteBufferMessageSet(config.compressionCodec, rawMessages: 
_*)
-  case _ =
-if(config.compressedTopics.contains(topicAndPartition.topic)) {
+try {
+ 

[jira] [Commented] (KAFKA-1277) Keep the summery/description when updating the RB with kafka-patch-review

2014-10-12 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168849#comment-14168849
 ] 

Neha Narkhede commented on KAFKA-1277:
--

[~omkreddy] Tried your latest patch. 
{noformat}
nnarkhed-mn1:kafka nnarkhed$ python kafka-patch-review.py -j KAFKA-1277 -b 
trunk -d test
Configuring reviewboard url to https://reviews.apache.org
Updating your remote branches to pull the latest changes
Usage: rbt post [options] [changenum]

Uploads diffs to create and update review requests.

rbt: error: no such option: -a
ERROR: reviewboard update failed. Exiting.
nnarkhed-mn1:kafka nnarkhed$ rbt --version
RBTools 0.5.2
{noformat}

 Keep the summery/description when updating the RB with kafka-patch-review
 -

 Key: KAFKA-1277
 URL: https://issues.apache.org/jira/browse/KAFKA-1277
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Manikumar Reddy
 Attachments: KAFKA-1277.patch, KAFKA-1277.patch, KAFKA-1277.patch, 
 KAFKA-1277_2014-10-04_16:39:56.patch, KAFKA-1277_2014-10-04_16:51:20.patch, 
 KAFKA-1277_2014-10-04_16:57:30.patch, KAFKA-1277_2014-10-04_17:00:37.patch, 
 KAFKA-1277_2014-10-04_17:01:43.patch, KAFKA-1277_2014-10-04_17:03:08.patch, 
 KAFKA-1277_2014-10-04_17:09:02.patch, KAFKA-1277_2014-10-05_11:04:33.patch, 
 KAFKA-1277_2014-10-05_11:09:08.patch, KAFKA-1277_2014-10-05_11:10:50.patch, 
 KAFKA-1277_2014-10-05_11:18:17.patch


 Today kafka-patch-review tool will always use a default title and description 
 if they are not specified, even when updating an existing RB. Would better 
 change to leave the current title/description as is.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 26633: Patch for KAFKA-1305

2014-10-12 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26633/
---

Review request for kafka.


Bugs: KAFKA-1305
https://issues.apache.org/jira/browse/KAFKA-1305


Repository: kafka


Description
---

KAFKA-1305. Controller can hang on controlled shutdown with auto leader balance 
enabled.


Diffs
-

  core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
ecbfa0f328ba6a652a758ab20cacef324a8b2fb8 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
90af698b01ec82b6168e02b6af41887ef164ad51 
  system_test/mirror_maker_testsuite/config/server.properties 
c6284122e3dfaa046a453511c17a19a536dc1035 
  system_test/offset_management_testsuite/config/server.properties 
2b988f86052a7bb248e1c7752808cb0a336d0020 
  
system_test/offset_management_testsuite/testcase_7002/config/kafka_server_1.properties
 41ec6e49272f129ab11a08aec04faf444f1c4f26 
  
system_test/offset_management_testsuite/testcase_7002/config/kafka_server_2.properties
 727e23701d6c2eb08c7ddde198e042921ef058f8 
  
system_test/offset_management_testsuite/testcase_7002/config/kafka_server_3.properties
 e6fbbe1e0532eb393a5b100c8b3c58be4c546ec0 
  
system_test/offset_management_testsuite/testcase_7002/config/kafka_server_4.properties
 fee65bce63564a7403d311be2c27c34e17e835a7 
  system_test/replication_testsuite/config/server.properties 
6becbab60e3943e56a77dc7872febb9764b2dca9 

Diff: https://reviews.apache.org/r/26633/diff/


Testing
---


Thanks,

Sriharsha Chintalapani



[jira] [Commented] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled

2014-10-12 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168852#comment-14168852
 ] 

Sriharsha Chintalapani commented on KAFKA-1305:
---

Created reviewboard https://reviews.apache.org/r/26633/diff/
 against branch origin/trunk

 Controller can hang on controlled shutdown with auto leader balance enabled
 ---

 Key: KAFKA-1305
 URL: https://issues.apache.org/jira/browse/KAFKA-1305
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
Priority: Blocker
 Fix For: 0.8.2, 0.9.0

 Attachments: KAFKA-1305.patch, KAFKA-1305.patch


 This is relatively easy to reproduce especially when doing a rolling bounce.
 What happened here is as follows:
 1. The previous controller was bounced and broker 265 became the new 
 controller.
 2. I went on to do a controlled shutdown of broker 265 (the new controller).
 3. In the mean time the automatically scheduled preferred replica leader 
 election process started doing its thing and starts sending 
 LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers).  
 (t@113 below).
 4. While that's happening, the controlled shutdown process on 265 succeeds 
 and proceeds to deregister itself from ZooKeeper and shuts down the socket 
 server.
 5. (ReplicaStateMachine actually removes deregistered brokers from the 
 controller channel manager's list of brokers to send requests to.  However, 
 that removal cannot take place (t@18 below) because preferred replica leader 
 election task owns the controller lock.)
 6. So the request thread to broker 265 gets into infinite retries.
 7. The entire broker shutdown process is blocked on controller shutdown for 
 the same reason (it needs to acquire the controller lock).
 Relevant portions from the thread-dump:
 Controller-265-to-broker-265-send-thread - Thread t@113
java.lang.Thread.State: TIMED_WAITING
   at java.lang.Thread.sleep(Native Method)
   at 
 kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143)
   at kafka.utils.Utils$.swallow(Utils.scala:167)
   at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
   at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
   at kafka.utils.Logging$class.swallow(Logging.scala:94)
   at kafka.utils.Utils$.swallow(Utils.scala:46)
   at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143)
   at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
   - locked java.lang.Object@6dbf14a7
   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
Locked ownable synchronizers:
   - None
 ...
 Thread-4 - Thread t@17
java.lang.Thread.State: WAITING on 
 java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: 
 kafka-scheduler-0
   at sun.misc.Unsafe.park(Native Method)
   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
   at 
 java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
   at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
   at kafka.utils.Utils$.inLock(Utils.scala:536)
   at kafka.controller.KafkaController.shutdown(KafkaController.scala:642)
   at 
 kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242)
   at kafka.utils.Utils$.swallow(Utils.scala:167)
   at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
   at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
   at kafka.utils.Logging$class.swallow(Logging.scala:94)
   at kafka.utils.Utils$.swallow(Utils.scala:46)
   at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242)
   at 
 kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46)
   at kafka.Kafka$$anon$1.run(Kafka.scala:42)
 ...
 kafka-scheduler-0 - Thread t@117
java.lang.Thread.State: WAITING on 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc
   at sun.misc.Unsafe.park(Native Method)
   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
   at 
 

[jira] [Updated] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled

2014-10-12 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani updated KAFKA-1305:
--
Attachment: KAFKA-1305.patch

 Controller can hang on controlled shutdown with auto leader balance enabled
 ---

 Key: KAFKA-1305
 URL: https://issues.apache.org/jira/browse/KAFKA-1305
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
Priority: Blocker
 Fix For: 0.8.2, 0.9.0

 Attachments: KAFKA-1305.patch, KAFKA-1305.patch


 This is relatively easy to reproduce especially when doing a rolling bounce.
 What happened here is as follows:
 1. The previous controller was bounced and broker 265 became the new 
 controller.
 2. I went on to do a controlled shutdown of broker 265 (the new controller).
 3. In the mean time the automatically scheduled preferred replica leader 
 election process started doing its thing and starts sending 
 LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers).  
 (t@113 below).
 4. While that's happening, the controlled shutdown process on 265 succeeds 
 and proceeds to deregister itself from ZooKeeper and shuts down the socket 
 server.
 5. (ReplicaStateMachine actually removes deregistered brokers from the 
 controller channel manager's list of brokers to send requests to.  However, 
 that removal cannot take place (t@18 below) because preferred replica leader 
 election task owns the controller lock.)
 6. So the request thread to broker 265 gets into infinite retries.
 7. The entire broker shutdown process is blocked on controller shutdown for 
 the same reason (it needs to acquire the controller lock).
 Relevant portions from the thread-dump:
 Controller-265-to-broker-265-send-thread - Thread t@113
java.lang.Thread.State: TIMED_WAITING
   at java.lang.Thread.sleep(Native Method)
   at 
 kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143)
   at kafka.utils.Utils$.swallow(Utils.scala:167)
   at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
   at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
   at kafka.utils.Logging$class.swallow(Logging.scala:94)
   at kafka.utils.Utils$.swallow(Utils.scala:46)
   at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143)
   at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
   - locked java.lang.Object@6dbf14a7
   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
Locked ownable synchronizers:
   - None
 ...
 Thread-4 - Thread t@17
java.lang.Thread.State: WAITING on 
 java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: 
 kafka-scheduler-0
   at sun.misc.Unsafe.park(Native Method)
   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
   at 
 java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
   at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
   at kafka.utils.Utils$.inLock(Utils.scala:536)
   at kafka.controller.KafkaController.shutdown(KafkaController.scala:642)
   at 
 kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242)
   at kafka.utils.Utils$.swallow(Utils.scala:167)
   at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
   at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
   at kafka.utils.Logging$class.swallow(Logging.scala:94)
   at kafka.utils.Utils$.swallow(Utils.scala:46)
   at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242)
   at 
 kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46)
   at kafka.Kafka$$anon$1.run(Kafka.scala:42)
 ...
 kafka-scheduler-0 - Thread t@117
java.lang.Thread.State: WAITING on 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc
   at sun.misc.Unsafe.park(Native Method)
   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
   at 
 java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
   at 
 kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
   - 

Build failed in Jenkins: Kafka-trunk #299

2014-10-12 Thread Apache Jenkins Server
See https://builds.apache.org/job/Kafka-trunk/299/changes

Changes:

[neha.narkhede] KAFKA-1471 Add producer unit tests for LZ4 and LZ4HC 
compression codecs; patched by James Oliver; reviewed by Neha Narkhede

--
[...truncated 1529 lines...]

kafka.admin.AdminTest  testPartitionReassignmentNonOverlappingReplicas FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at kafka.admin.AdminTest.setUp(AdminTest.scala:33)

kafka.admin.AdminTest  testReassigningNonExistingPartition FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at kafka.admin.AdminTest.setUp(AdminTest.scala:33)

kafka.admin.AdminTest  testResumePartitionReassignmentThatWasCompleted FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at kafka.admin.AdminTest.setUp(AdminTest.scala:33)

kafka.admin.AdminTest  testPreferredReplicaJsonData FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at kafka.admin.AdminTest.setUp(AdminTest.scala:33)

kafka.admin.AdminTest  testBasicPreferredReplicaElection FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at kafka.admin.AdminTest.setUp(AdminTest.scala:33)

kafka.admin.AdminTest  testShutdownBroker FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at kafka.admin.AdminTest.setUp(AdminTest.scala:33)

kafka.admin.AdminTest  testTopicConfigChange FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 

[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-10-12 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168930#comment-14168930
 ] 

Gwen Shapira commented on KAFKA-1555:
-

Awesome.

So I'm updating here: http://svn.apache.org/repos/asf/kafka/site/

It looks like the right thing to do is to create a 082 subdir, copy the current 
documentation there and add the new reliability guarantees.
We can then re-point the main doc link to that directory when we release. I 
think this is less confusing that modifying the docs after the release. Does 
that make sense?

BTW. There are probably more stuff we need to document for 0.8.2 - perhaps we 
need to start filing doc jiras for the new version? The new producer will be 
very high on that list. 

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, 
 KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, 
 KAFKA-1555.8.patch, KAFKA-1555.9.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1702) Messages silently Lost by producer

2014-10-12 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168929#comment-14168929
 ] 

Jun Rao commented on KAFKA-1702:


Thanks for the patch. Not sure why you want to wrap groupMessagesToSet in 
try/catch. I thought that's where the NoClassDefFoundError is thrown and you 
want to propagate it to the caller.

With this patch, the producer in sync mode will get the exception. However, 
producer in async mode will still silently dropping messages. This is a general 
limitation and is being fixed in the new java producer through a callback.

 Messages silently Lost by producer
 --

 Key: KAFKA-1702
 URL: https://issues.apache.org/jira/browse/KAFKA-1702
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.1.1
Reporter: Alexis Midon
Assignee: Jun Rao
 Attachments: KAFKA-1702.0.patch


 Hello,
 we lost millions of messages because of this {{try/catch}} in  the producer 
 {{DefaultEventHandler}}:
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala#L114-L116
 If a Throwable is caught by this {{try/catch}}, the retry policy will have no 
 effect and all yet-to-be-sent messages are lost (the error will break the 
 loop over the broker list).
 This issue is very hard to detect because: the producer (async or sync) 
 cannot even catch the error, and *all* the metrics are updated as if 
 everything was fine.
 Only the abnormal drop in the producers network I/O, or the incoming message 
 rate on the brokers; or the alerting on errors in producer logs could have 
 revealed the issue. 
 This behavior was introduced by KAFKA-300. I can't see a good reason for it, 
 so here is a patch that will let the retry-policy do its job when such a 
 {{Throwable}} occurs.
 Thanks in advance for your help.
 Alexis
 ps: you might wonder how could this {{try/catch}} ever caught something? 
 {{DefaultEventHandler#groupMessagesToSet}} looks so harmless. 
 Here are the details:
 We use Snappy compression. When the native snappy library is not installed on 
 the host, Snappy, during the initialization of class 
 {{org.xerial.snappy.Snappy}}  will [write a C 
 library|https://github.com/xerial/snappy-java/blob/1.1.0/src/main/java/org/xerial/snappy/SnappyLoader.java#L312]
  in the JVM temp directory {{java.io.tmpdir}}.
 In our scenario, {{java.io.tmpdir}} was a subdirectory of {{/tmp}}. After an 
 instance reboot (thank you 
 [AWS|https://twitter.com/hashtag/AWSReboot?src=hash]!), the JVM temp 
 directory was removed. The JVM was then running with a non-existing temp dir. 
 Snappy class would be impossible to initialize and the following message 
 would be silently logged:
 {code}
 ERROR [2014-10-07 22:23:56,530] kafka.producer.async.DefaultEventHandler: 
 Failed to send messages
 ! java.lang.NoClassDefFoundError: Could not initialize class 
 org.xerial.snappy.Snappy
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled

2014-10-12 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168934#comment-14168934
 ] 

Jun Rao commented on KAFKA-1305:


Perhaps we can just set the default queue to max int for now. If we don't see 
any issue with this, we can make LinkedBlockingQueue to unbounded later. Could 
you also change the default value for auto.leader.rebalance.enable?

 Controller can hang on controlled shutdown with auto leader balance enabled
 ---

 Key: KAFKA-1305
 URL: https://issues.apache.org/jira/browse/KAFKA-1305
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
Priority: Blocker
 Fix For: 0.8.2, 0.9.0

 Attachments: KAFKA-1305.patch, KAFKA-1305.patch


 This is relatively easy to reproduce especially when doing a rolling bounce.
 What happened here is as follows:
 1. The previous controller was bounced and broker 265 became the new 
 controller.
 2. I went on to do a controlled shutdown of broker 265 (the new controller).
 3. In the mean time the automatically scheduled preferred replica leader 
 election process started doing its thing and starts sending 
 LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers).  
 (t@113 below).
 4. While that's happening, the controlled shutdown process on 265 succeeds 
 and proceeds to deregister itself from ZooKeeper and shuts down the socket 
 server.
 5. (ReplicaStateMachine actually removes deregistered brokers from the 
 controller channel manager's list of brokers to send requests to.  However, 
 that removal cannot take place (t@18 below) because preferred replica leader 
 election task owns the controller lock.)
 6. So the request thread to broker 265 gets into infinite retries.
 7. The entire broker shutdown process is blocked on controller shutdown for 
 the same reason (it needs to acquire the controller lock).
 Relevant portions from the thread-dump:
 Controller-265-to-broker-265-send-thread - Thread t@113
java.lang.Thread.State: TIMED_WAITING
   at java.lang.Thread.sleep(Native Method)
   at 
 kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143)
   at kafka.utils.Utils$.swallow(Utils.scala:167)
   at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
   at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
   at kafka.utils.Logging$class.swallow(Logging.scala:94)
   at kafka.utils.Utils$.swallow(Utils.scala:46)
   at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143)
   at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
   - locked java.lang.Object@6dbf14a7
   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
Locked ownable synchronizers:
   - None
 ...
 Thread-4 - Thread t@17
java.lang.Thread.State: WAITING on 
 java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: 
 kafka-scheduler-0
   at sun.misc.Unsafe.park(Native Method)
   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
   at 
 java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
   at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
   at kafka.utils.Utils$.inLock(Utils.scala:536)
   at kafka.controller.KafkaController.shutdown(KafkaController.scala:642)
   at 
 kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242)
   at kafka.utils.Utils$.swallow(Utils.scala:167)
   at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
   at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
   at kafka.utils.Logging$class.swallow(Logging.scala:94)
   at kafka.utils.Utils$.swallow(Utils.scala:46)
   at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242)
   at 
 kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46)
   at kafka.Kafka$$anon$1.run(Kafka.scala:42)
 ...
 kafka-scheduler-0 - Thread t@117
java.lang.Thread.State: WAITING on 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc
   at sun.misc.Unsafe.park(Native Method)
   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
   at 
 

[jira] [Commented] (KAFKA-1702) Messages silently Lost by producer

2014-10-12 Thread Alexis Midon (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168941#comment-14168941
 ] 

Alexis Midon commented on KAFKA-1702:
-

I agree. In the async mode, until there is a callback, the best we can do is to 
make sure all the metrics are updated correctly, in particular ResendsPerSec, 
FailedSendsPerSec, which is critical for monitoring of async producers.

In the sync mode, producer will get the exception, which is an improvement.

thanks for your review

 Messages silently Lost by producer
 --

 Key: KAFKA-1702
 URL: https://issues.apache.org/jira/browse/KAFKA-1702
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.1.1
Reporter: Alexis Midon
Assignee: Jun Rao
 Attachments: KAFKA-1702.0.patch


 Hello,
 we lost millions of messages because of this {{try/catch}} in  the producer 
 {{DefaultEventHandler}}:
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala#L114-L116
 If a Throwable is caught by this {{try/catch}}, the retry policy will have no 
 effect and all yet-to-be-sent messages are lost (the error will break the 
 loop over the broker list).
 This issue is very hard to detect because: the producer (async or sync) 
 cannot even catch the error, and *all* the metrics are updated as if 
 everything was fine.
 Only the abnormal drop in the producers network I/O, or the incoming message 
 rate on the brokers; or the alerting on errors in producer logs could have 
 revealed the issue. 
 This behavior was introduced by KAFKA-300. I can't see a good reason for it, 
 so here is a patch that will let the retry-policy do its job when such a 
 {{Throwable}} occurs.
 Thanks in advance for your help.
 Alexis
 ps: you might wonder how could this {{try/catch}} ever caught something? 
 {{DefaultEventHandler#groupMessagesToSet}} looks so harmless. 
 Here are the details:
 We use Snappy compression. When the native snappy library is not installed on 
 the host, Snappy, during the initialization of class 
 {{org.xerial.snappy.Snappy}}  will [write a C 
 library|https://github.com/xerial/snappy-java/blob/1.1.0/src/main/java/org/xerial/snappy/SnappyLoader.java#L312]
  in the JVM temp directory {{java.io.tmpdir}}.
 In our scenario, {{java.io.tmpdir}} was a subdirectory of {{/tmp}}. After an 
 instance reboot (thank you 
 [AWS|https://twitter.com/hashtag/AWSReboot?src=hash]!), the JVM temp 
 directory was removed. The JVM was then running with a non-existing temp dir. 
 Snappy class would be impossible to initialize and the following message 
 would be silently logged:
 {code}
 ERROR [2014-10-07 22:23:56,530] kafka.producer.async.DefaultEventHandler: 
 Failed to send messages
 ! java.lang.NoClassDefFoundError: Could not initialize class 
 org.xerial.snappy.Snappy
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1702) Messages silently Lost by producer

2014-10-12 Thread Alexis Midon (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168942#comment-14168942
 ] 

Alexis Midon commented on KAFKA-1702:
-

Also if #groupMessagesToSet is not in a try/catch, the error will break the 
loop on the broker list. All messages will get dropped, retries ignored, 
metrics won't get updated, etc.

 Messages silently Lost by producer
 --

 Key: KAFKA-1702
 URL: https://issues.apache.org/jira/browse/KAFKA-1702
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.1.1
Reporter: Alexis Midon
Assignee: Jun Rao
 Attachments: KAFKA-1702.0.patch


 Hello,
 we lost millions of messages because of this {{try/catch}} in  the producer 
 {{DefaultEventHandler}}:
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala#L114-L116
 If a Throwable is caught by this {{try/catch}}, the retry policy will have no 
 effect and all yet-to-be-sent messages are lost (the error will break the 
 loop over the broker list).
 This issue is very hard to detect because: the producer (async or sync) 
 cannot even catch the error, and *all* the metrics are updated as if 
 everything was fine.
 Only the abnormal drop in the producers network I/O, or the incoming message 
 rate on the brokers; or the alerting on errors in producer logs could have 
 revealed the issue. 
 This behavior was introduced by KAFKA-300. I can't see a good reason for it, 
 so here is a patch that will let the retry-policy do its job when such a 
 {{Throwable}} occurs.
 Thanks in advance for your help.
 Alexis
 ps: you might wonder how could this {{try/catch}} ever caught something? 
 {{DefaultEventHandler#groupMessagesToSet}} looks so harmless. 
 Here are the details:
 We use Snappy compression. When the native snappy library is not installed on 
 the host, Snappy, during the initialization of class 
 {{org.xerial.snappy.Snappy}}  will [write a C 
 library|https://github.com/xerial/snappy-java/blob/1.1.0/src/main/java/org/xerial/snappy/SnappyLoader.java#L312]
  in the JVM temp directory {{java.io.tmpdir}}.
 In our scenario, {{java.io.tmpdir}} was a subdirectory of {{/tmp}}. After an 
 instance reboot (thank you 
 [AWS|https://twitter.com/hashtag/AWSReboot?src=hash]!), the JVM temp 
 directory was removed. The JVM was then running with a non-existing temp dir. 
 Snappy class would be impossible to initialize and the following message 
 would be silently logged:
 {code}
 ERROR [2014-10-07 22:23:56,530] kafka.producer.async.DefaultEventHandler: 
 Failed to send messages
 ! java.lang.NoClassDefFoundError: Could not initialize class 
 org.xerial.snappy.Snappy
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)