[jira] [Updated] (KAFKA-1670) Corrupt log files for segment.bytes values close to Int.MaxInt
[ https://issues.apache.org/jira/browse/KAFKA-1670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1670: --- Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the followup patch for system tests. +1 and committed to trunk and 0.8.2. Yes, it's possible for the broker to break the message set into smaller chunks that fit in the configured log segment size. However, there may be some benefit to keep the original message set as a whole. For example, if the message set is compressed, this guarantees that either all or none of the messages in the set are replicated to the followers. Corrupt log files for segment.bytes values close to Int.MaxInt -- Key: KAFKA-1670 URL: https://issues.apache.org/jira/browse/KAFKA-1670 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Assignee: Sriharsha Chintalapani Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1670.patch, KAFKA-1670.patch, KAFKA-1670_2014-10-04_20:17:46.patch, KAFKA-1670_2014-10-06_09:48:25.patch, KAFKA-1670_2014-10-07_13:39:13.patch, KAFKA-1670_2014-10-07_13:49:10.patch, KAFKA-1670_2014-10-07_18:39:31.patch The maximum value for the topic-level config {{segment.bytes}} is {{Int.MaxInt}} (2147483647). *Using this value causes brokers to corrupt their log files, leaving them unreadable.* We set {{segment.bytes}} to {{2122317824}} which is well below the maximum. One by one, the ISR of all partitions shrunk to 1. Brokers would crash when restarted, attempting to read from a negative offset in a log file. After discovering that many segment files had grown to 4GB or more, we were forced to shut down our *entire production Kafka cluster* for several hours while we split all segment files into 1GB chunks. Looking into the {{kafka.log}} code, the {{segment.bytes}} parameter is used inconsistently. It is treated as a *soft* maximum for the size of the segment file (https://github.com/apache/kafka/blob/0.8.1.1/core/src/main/scala/kafka/log/LogConfig.scala#L26) with logs rolled only after (https://github.com/apache/kafka/blob/0.8.1.1/core/src/main/scala/kafka/log/Log.scala#L246) they exceed this value. However, much of the code that deals with log files uses *ints* to store the size of the file and the position in the file. Overflow of these ints leads the broker to append to the segments indefinitely, and to fail to read these segments for consuming or recovery. This is trivial to reproduce: {code} $ bin/kafka-topics.sh --topic segment-bytes-test --create --replication-factor 2 --partitions 1 --zookeeper zkhost:2181 $ bin/kafka-topics.sh --topic segment-bytes-test --alter --config segment.bytes=2147483647 --zookeeper zkhost:2181 $ yes Int.MaxValue is a ridiculous bound on file size in 2014 | bin/kafka-console-producer.sh --broker-list localhost:6667 zkhost:2181 --topic segment-bytes-test {code} After running for a few minutes, the log file is corrupt: {code} $ ls -lh data/segment-bytes-test-0/ total 9.7G -rw-r--r-- 1 root root 10M Oct 3 19:39 .index -rw-r--r-- 1 root root 9.7G Oct 3 19:39 .log {code} We recovered the data from the log files using a simple Python script: https://gist.github.com/also/9f823d9eb9dc0a410796 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168692#comment-14168692 ] Neha Narkhede commented on KAFKA-1555: -- bq. My vote would be to update our documentation - http://kafka.apache.org/documentation.html +1. Though it will be less confusing if we wait for 0.8.2 to be released. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1634) Improve semantics of timestamp in OffsetCommitRequests and update documentation
[ https://issues.apache.org/jira/browse/KAFKA-1634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168696#comment-14168696 ] Jun Rao commented on KAFKA-1634: Joel, I don't see OffsetFetchResponse contain timestamp (it only contains offset, metadata and error code). I agree that it's better to move retentionMs to the topic-level of OffsetCommitRequest. We can bump up the protocol version if needed. Improve semantics of timestamp in OffsetCommitRequests and update documentation --- Key: KAFKA-1634 URL: https://issues.apache.org/jira/browse/KAFKA-1634 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Assignee: Joel Koshy Priority: Blocker Fix For: 0.8.2 From the mailing list - following up on this -- I think the online API docs for OffsetCommitRequest still incorrectly refer to client-side timestamps: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest Wasn't that removed and now always handled server-side now? Would one of the devs mind updating the API spec wiki? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1634) Improve semantics of timestamp in OffsetCommitRequests and update documentation
[ https://issues.apache.org/jira/browse/KAFKA-1634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168802#comment-14168802 ] Neha Narkhede commented on KAFKA-1634: -- +1 to your suggestions above. Thanks [~jjkoshy] Improve semantics of timestamp in OffsetCommitRequests and update documentation --- Key: KAFKA-1634 URL: https://issues.apache.org/jira/browse/KAFKA-1634 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Assignee: Joel Koshy Priority: Blocker Fix For: 0.8.2 From the mailing list - following up on this -- I think the online API docs for OffsetCommitRequest still incorrectly refer to client-side timestamps: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest Wasn't that removed and now always handled server-side now? Would one of the devs mind updating the API spec wiki? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1701) Improve controller and broker message handling.
[ https://issues.apache.org/jira/browse/KAFKA-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1701: - Reviewer: Neha Narkhede Improve controller and broker message handling. --- Key: KAFKA-1701 URL: https://issues.apache.org/jira/browse/KAFKA-1701 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin This ticket is a memo for future controller refactoring. It is related to KAFKA-1547. Ideally, the broker should only follow instruction from controller but not handle it smartly. For KAFKA-1547, the controller should filter out the partitions whose leader is not up yet before send LeaderAndIsrRequest to broker. The idea is controller should handle all the edge cases instead of letting broker do it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1680) JmxTool exits if no arguments are given
[ https://issues.apache.org/jira/browse/KAFKA-1680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1680: - Labels: newbie (was: ) JmxTool exits if no arguments are given --- Key: KAFKA-1680 URL: https://issues.apache.org/jira/browse/KAFKA-1680 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Ryan Berdeen Assignee: Ewen Cheslack-Postava Priority: Minor Labels: newbie Attachments: KAFKA-1680.patch JmxTool has no required arguments, but it exits if no arguments are provided. You can work around this by passing a non-option argument, which will be ignored, e.g.{{./bin/kafka-run-class.sh kafka.tools.JmxTool xxx}}. It looks like this was broken in KAFKA-1291 / 6b0ae4bba0d0f8e4c8da19de65a8f03f162bec39 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled
[ https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168819#comment-14168819 ] Neha Narkhede commented on KAFKA-1305: -- Increasing the queue size by a little doesn't really solve the problem. We should conduct more tests on an unbounded controller queue, if we have any doubt whether or not it will work. [~sriharsha] I will help you review changes to the controller, if you are up for updating your patch. Controller can hang on controlled shutdown with auto leader balance enabled --- Key: KAFKA-1305 URL: https://issues.apache.org/jira/browse/KAFKA-1305 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Priority: Blocker Fix For: 0.8.2, 0.9.0 Attachments: KAFKA-1305.patch This is relatively easy to reproduce especially when doing a rolling bounce. What happened here is as follows: 1. The previous controller was bounced and broker 265 became the new controller. 2. I went on to do a controlled shutdown of broker 265 (the new controller). 3. In the mean time the automatically scheduled preferred replica leader election process started doing its thing and starts sending LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers). (t@113 below). 4. While that's happening, the controlled shutdown process on 265 succeeds and proceeds to deregister itself from ZooKeeper and shuts down the socket server. 5. (ReplicaStateMachine actually removes deregistered brokers from the controller channel manager's list of brokers to send requests to. However, that removal cannot take place (t@18 below) because preferred replica leader election task owns the controller lock.) 6. So the request thread to broker 265 gets into infinite retries. 7. The entire broker shutdown process is blocked on controller shutdown for the same reason (it needs to acquire the controller lock). Relevant portions from the thread-dump: Controller-265-to-broker-265-send-thread - Thread t@113 java.lang.Thread.State: TIMED_WAITING at java.lang.Thread.sleep(Native Method) at kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:46) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) - locked java.lang.Object@6dbf14a7 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) Locked ownable synchronizers: - None ... Thread-4 - Thread t@17 java.lang.Thread.State: WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: kafka-scheduler-0 at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) at kafka.utils.Utils$.inLock(Utils.scala:536) at kafka.controller.KafkaController.shutdown(KafkaController.scala:642) at kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:46) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242) at kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46) at kafka.Kafka$$anon$1.run(Kafka.scala:42) ... kafka-scheduler-0 - Thread t@117 java.lang.Thread.State: WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at
Re: Review Request 26564: Patch for KAFKA-1471
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26564/#review56365 --- Thanks for updating the patch, Ewen. 2 minor comments. Probably worth updating PerfConfig as well as MessageTest - Neha Narkhede On Oct. 10, 2014, 5:43 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26564/ --- (Updated Oct. 10, 2014, 5:43 p.m.) Review request for kafka. Bugs: KAFKA-1471 https://issues.apache.org/jira/browse/KAFKA-1471 Repository: kafka Description --- KAFKA-1471 Add producer unit tests for LZ4 and LZ4HC compression codecs Diffs - clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 79d57f9bf31606ffa5400f2f12356eba84703cc2 core/src/main/scala/kafka/tools/ConsoleProducer.scala 8e9ba0b284671989f87d9c421bc98f5c4384c260 core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 17e2c6e9dfd789acb4b6db37c780c862667e4e11 Diff: https://reviews.apache.org/r/26564/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Re: Review Request 26563: Patch for KAFKA-1692
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26563/#review56366 --- Ship it! Ship It! - Neha Narkhede On Oct. 10, 2014, 5:38 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26563/ --- (Updated Oct. 10, 2014, 5:38 p.m.) Review request for kafka. Bugs: KAFKA-1692 https://issues.apache.org/jira/browse/KAFKA-1692 Repository: kafka Description --- KAFKA-1692 Include client ID in new producer IO thread name. Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java f58b8508d3f813a51015abed772c704390887d7e Diff: https://reviews.apache.org/r/26563/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1692) [Java New Producer] IO Thread Name Must include Client ID
[ https://issues.apache.org/jira/browse/KAFKA-1692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1692: - Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the patch. Pushed to trunk [Java New Producer] IO Thread Name Must include Client ID --- Key: KAFKA-1692 URL: https://issues.apache.org/jira/browse/KAFKA-1692 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Trivial Labels: newbie Attachments: KAFKA-1692.patch Please add client id so people who are looking at Jconsole or Profile tool can see Thread by client id since single JVM can have multiple producer instance. org.apache.kafka.clients.producer.KafkaProducer {code} String ioThreadName = kafka-producer-network-thread; if(clientId != null){ ioThreadName = ioThreadName + | +clientId; } this.ioThread = new KafkaThread(ioThreadName, this.sender, true); {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1692) [Java New Producer] IO Thread Name Must include Client ID
[ https://issues.apache.org/jira/browse/KAFKA-1692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168831#comment-14168831 ] Neha Narkhede edited comment on KAFKA-1692 at 10/12/14 11:00 PM: - Thanks for the patch. Pushed to trunk and 0.8.2 was (Author: nehanarkhede): Thanks for the patch. Pushed to trunk [Java New Producer] IO Thread Name Must include Client ID --- Key: KAFKA-1692 URL: https://issues.apache.org/jira/browse/KAFKA-1692 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Trivial Labels: newbie Attachments: KAFKA-1692.patch Please add client id so people who are looking at Jconsole or Profile tool can see Thread by client id since single JVM can have multiple producer instance. org.apache.kafka.clients.producer.KafkaProducer {code} String ioThreadName = kafka-producer-network-thread; if(clientId != null){ ioThreadName = ioThreadName + | +clientId; } this.ioThread = new KafkaThread(ioThreadName, this.sender, true); {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1471) Add Producer Unit Tests for LZ4 and LZ4HC compression
[ https://issues.apache.org/jira/browse/KAFKA-1471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168834#comment-14168834 ] Ewen Cheslack-Postava commented on KAFKA-1471: -- Updated reviewboard https://reviews.apache.org/r/26564/diff/ against branch origin/trunk Add Producer Unit Tests for LZ4 and LZ4HC compression - Key: KAFKA-1471 URL: https://issues.apache.org/jira/browse/KAFKA-1471 Project: Kafka Issue Type: Sub-task Reporter: James Oliver Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471_2014-10-12_16:02:19.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26564: Patch for KAFKA-1471
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26564/ --- (Updated Oct. 12, 2014, 11:02 p.m.) Review request for kafka. Bugs: KAFKA-1471 https://issues.apache.org/jira/browse/KAFKA-1471 Repository: kafka Description (updated) --- KAFKA-1471 Add producer unit tests for LZ4 and LZ4HC compression codecs; patched by James Oliver Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 79d57f9bf31606ffa5400f2f12356eba84703cc2 core/src/main/scala/kafka/tools/ConsoleProducer.scala 8e9ba0b284671989f87d9c421bc98f5c4384c260 core/src/main/scala/kafka/tools/PerfConfig.scala 129cc013f68d2b89bdfea74a1d9ee26a011791f2 core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 17e2c6e9dfd789acb4b6db37c780c862667e4e11 core/src/test/scala/unit/kafka/message/MessageTest.scala 4837585d03535043a6f25938368988128df9b94a Diff: https://reviews.apache.org/r/26564/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1471) Add Producer Unit Tests for LZ4 and LZ4HC compression
[ https://issues.apache.org/jira/browse/KAFKA-1471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1471: - Attachment: KAFKA-1471_2014-10-12_16:02:19.patch Add Producer Unit Tests for LZ4 and LZ4HC compression - Key: KAFKA-1471 URL: https://issues.apache.org/jira/browse/KAFKA-1471 Project: Kafka Issue Type: Sub-task Reporter: James Oliver Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471_2014-10-12_16:02:19.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Build failed in Jenkins: Kafka-trunk #298
See https://builds.apache.org/job/Kafka-trunk/298/changes Changes: [neha.narkhede] KAFKA-1692 Include client ID in new producer IO thread name; reviewed by Neha Narkhede -- [...truncated 2018 lines...] kafka.admin.AdminTest testPartitionReassignmentNonOverlappingReplicas FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95) at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33) at kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33) at kafka.admin.AdminTest.setUp(AdminTest.scala:33) kafka.admin.AdminTest testReassigningNonExistingPartition FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95) at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33) at kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33) at kafka.admin.AdminTest.setUp(AdminTest.scala:33) kafka.admin.AdminTest testResumePartitionReassignmentThatWasCompleted FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95) at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33) at kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33) at kafka.admin.AdminTest.setUp(AdminTest.scala:33) kafka.admin.AdminTest testPreferredReplicaJsonData FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95) at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33) at kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33) at kafka.admin.AdminTest.setUp(AdminTest.scala:33) kafka.admin.AdminTest testBasicPreferredReplicaElection FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95) at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33) at kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33) at kafka.admin.AdminTest.setUp(AdminTest.scala:33) kafka.admin.AdminTest testShutdownBroker FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95) at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33) at kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33) at kafka.admin.AdminTest.setUp(AdminTest.scala:33) kafka.admin.AdminTest testTopicConfigChange FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at
Re: Review Request 26564: Patch for KAFKA-1471
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26564/#review56368 --- Ship it! Ship It! - Neha Narkhede On Oct. 12, 2014, 11:02 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26564/ --- (Updated Oct. 12, 2014, 11:02 p.m.) Review request for kafka. Bugs: KAFKA-1471 https://issues.apache.org/jira/browse/KAFKA-1471 Repository: kafka Description --- KAFKA-1471 Add producer unit tests for LZ4 and LZ4HC compression codecs; patched by James Oliver Diffs - clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 79d57f9bf31606ffa5400f2f12356eba84703cc2 core/src/main/scala/kafka/tools/ConsoleProducer.scala 8e9ba0b284671989f87d9c421bc98f5c4384c260 core/src/main/scala/kafka/tools/PerfConfig.scala 129cc013f68d2b89bdfea74a1d9ee26a011791f2 core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 17e2c6e9dfd789acb4b6db37c780c862667e4e11 core/src/test/scala/unit/kafka/message/MessageTest.scala 4837585d03535043a6f25938368988128df9b94a Diff: https://reviews.apache.org/r/26564/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Commented] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled
[ https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168838#comment-14168838 ] Sriharsha Chintalapani commented on KAFKA-1305: --- [~nehanarkhede] so the changes you are looking for are remove the config option and make the LinkedBlockingQueue to unbounded? Controller can hang on controlled shutdown with auto leader balance enabled --- Key: KAFKA-1305 URL: https://issues.apache.org/jira/browse/KAFKA-1305 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Priority: Blocker Fix For: 0.8.2, 0.9.0 Attachments: KAFKA-1305.patch This is relatively easy to reproduce especially when doing a rolling bounce. What happened here is as follows: 1. The previous controller was bounced and broker 265 became the new controller. 2. I went on to do a controlled shutdown of broker 265 (the new controller). 3. In the mean time the automatically scheduled preferred replica leader election process started doing its thing and starts sending LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers). (t@113 below). 4. While that's happening, the controlled shutdown process on 265 succeeds and proceeds to deregister itself from ZooKeeper and shuts down the socket server. 5. (ReplicaStateMachine actually removes deregistered brokers from the controller channel manager's list of brokers to send requests to. However, that removal cannot take place (t@18 below) because preferred replica leader election task owns the controller lock.) 6. So the request thread to broker 265 gets into infinite retries. 7. The entire broker shutdown process is blocked on controller shutdown for the same reason (it needs to acquire the controller lock). Relevant portions from the thread-dump: Controller-265-to-broker-265-send-thread - Thread t@113 java.lang.Thread.State: TIMED_WAITING at java.lang.Thread.sleep(Native Method) at kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:46) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) - locked java.lang.Object@6dbf14a7 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) Locked ownable synchronizers: - None ... Thread-4 - Thread t@17 java.lang.Thread.State: WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: kafka-scheduler-0 at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) at kafka.utils.Utils$.inLock(Utils.scala:536) at kafka.controller.KafkaController.shutdown(KafkaController.scala:642) at kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:46) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242) at kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46) at kafka.Kafka$$anon$1.run(Kafka.scala:42) ... kafka-scheduler-0 - Thread t@117 java.lang.Thread.State: WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at
Re: Review Request 26564: Patch for KAFKA-1471
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26564/#review56369 --- Ship it! Ship It! - Neha Narkhede On Oct. 12, 2014, 11:02 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26564/ --- (Updated Oct. 12, 2014, 11:02 p.m.) Review request for kafka. Bugs: KAFKA-1471 https://issues.apache.org/jira/browse/KAFKA-1471 Repository: kafka Description --- KAFKA-1471 Add producer unit tests for LZ4 and LZ4HC compression codecs; patched by James Oliver Diffs - clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 79d57f9bf31606ffa5400f2f12356eba84703cc2 core/src/main/scala/kafka/tools/ConsoleProducer.scala 8e9ba0b284671989f87d9c421bc98f5c4384c260 core/src/main/scala/kafka/tools/PerfConfig.scala 129cc013f68d2b89bdfea74a1d9ee26a011791f2 core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 17e2c6e9dfd789acb4b6db37c780c862667e4e11 core/src/test/scala/unit/kafka/message/MessageTest.scala 4837585d03535043a6f25938368988128df9b94a Diff: https://reviews.apache.org/r/26564/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Created] (KAFKA-1702) Messages silently Lost by producer
Alexis Midon created KAFKA-1702: --- Summary: Messages silently Lost by producer Key: KAFKA-1702 URL: https://issues.apache.org/jira/browse/KAFKA-1702 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1 Reporter: Alexis Midon Assignee: Jun Rao Hello, we lost millions of messages because of this {{try/catch}} in the producer {{DefaultEventHandler}}: https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala#L114-L116 If a Throwable is caught by this {{try/catch}}, the retry policy will have no effect and all yet-to-be-sent messages are lost (the error will break the loop over the broker list). This issue is very hard to detect because: the producer (async or sync) cannot even catch the error, and *all* the metrics are updated as if everything was fine. Only the abnormal drop in the producers network I/O, or the incoming message rate on the brokers; or the alerting on errors in producer logs could have revealed the issue. This behavior was introduced by KAFKA-300. I can't see a good reason for it, so here is a patch that will let the retry-policy do its job when such a {{Throwable}} occurs. Thanks in advance for your help. Alexis ps: you might wonder how could this {{try/catch}} ever caught something? {{DefaultEventHandler#groupMessagesToSet}} looks so harmless. Here are the details: We use Snappy compression. When the native snappy library is not installed on the host, Snappy, during the initialization of class {{org.xerial.snappy.Snappy}} will [write a C library|https://github.com/xerial/snappy-java/blob/1.1.0/src/main/java/org/xerial/snappy/SnappyLoader.java#L312] in the JVM temp directory {{java.io.tmpdir}}. In our scenario, {{java.io.tmpdir}} was a subdirectory of {{/tmp}}. After an instance reboot (thank you [AWS|https://twitter.com/hashtag/AWSReboot?src=hash]!), the JVM temp directory was removed. The JVM was then running with a non-existing temp dir. Snappy class would be impossible to initialize and the following message would be silently logged: {code} ERROR [2014-10-07 22:23:56,530] kafka.producer.async.DefaultEventHandler: Failed to send messages ! java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1702) Messages silently Lost by producer
[ https://issues.apache.org/jira/browse/KAFKA-1702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexis Midon updated KAFKA-1702: Attachment: KAFKA-1702.0.patch Messages silently Lost by producer -- Key: KAFKA-1702 URL: https://issues.apache.org/jira/browse/KAFKA-1702 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1 Reporter: Alexis Midon Assignee: Jun Rao Attachments: KAFKA-1702.0.patch Hello, we lost millions of messages because of this {{try/catch}} in the producer {{DefaultEventHandler}}: https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala#L114-L116 If a Throwable is caught by this {{try/catch}}, the retry policy will have no effect and all yet-to-be-sent messages are lost (the error will break the loop over the broker list). This issue is very hard to detect because: the producer (async or sync) cannot even catch the error, and *all* the metrics are updated as if everything was fine. Only the abnormal drop in the producers network I/O, or the incoming message rate on the brokers; or the alerting on errors in producer logs could have revealed the issue. This behavior was introduced by KAFKA-300. I can't see a good reason for it, so here is a patch that will let the retry-policy do its job when such a {{Throwable}} occurs. Thanks in advance for your help. Alexis ps: you might wonder how could this {{try/catch}} ever caught something? {{DefaultEventHandler#groupMessagesToSet}} looks so harmless. Here are the details: We use Snappy compression. When the native snappy library is not installed on the host, Snappy, during the initialization of class {{org.xerial.snappy.Snappy}} will [write a C library|https://github.com/xerial/snappy-java/blob/1.1.0/src/main/java/org/xerial/snappy/SnappyLoader.java#L312] in the JVM temp directory {{java.io.tmpdir}}. In our scenario, {{java.io.tmpdir}} was a subdirectory of {{/tmp}}. After an instance reboot (thank you [AWS|https://twitter.com/hashtag/AWSReboot?src=hash]!), the JVM temp directory was removed. The JVM was then running with a non-existing temp dir. Snappy class would be impossible to initialize and the following message would be silently logged: {code} ERROR [2014-10-07 22:23:56,530] kafka.producer.async.DefaultEventHandler: Failed to send messages ! java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (KAFKA-1702) Messages silently Lost by producer
[ https://issues.apache.org/jira/browse/KAFKA-1702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexis Midon updated KAFKA-1702: Comment: was deleted (was: diff --git core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index d8ac915..0f7f941 100644 --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -95,27 +95,28 @@ class DefaultEventHandler[K,V](config: ProducerConfig, val partitionedDataOpt = partitionAndCollate(messages) partitionedDataOpt match { case Some(partitionedData) = -val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]] -try { - for ((brokerid, messagesPerBrokerMap) - partitionedData) { -if (logger.isTraceEnabled) - messagesPerBrokerMap.foreach(partitionAndEvent = -trace(Handling event for Topic: %s, Broker: %d, Partitions: %s.format(partitionAndEvent._1, brokerid, partitionAndEvent._2))) -val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap) - -val failedTopicPartitions = send(brokerid, messageSetPerBroker) -failedTopicPartitions.foreach(topicPartition = { - messagesPerBrokerMap.get(topicPartition) match { -case Some(data) = failedProduceRequests.appendAll(data) -case None = // nothing - } -}) +val failedProduceRequests = new ArrayBuffer[KeyedMessage[K, Message]] +for ((brokerid, messagesPerBrokerMap) - partitionedData) { + if (logger.isTraceEnabled) { +messagesPerBrokerMap.foreach(partitionAndEvent = + trace(Handling event for Topic: %s, Broker: %d, Partitions: %s.format(partitionAndEvent._1, brokerid, partitionAndEvent._2))) + } + val messageSetPerBrokerOpt = groupMessagesToSet(messagesPerBrokerMap) + messageSetPerBrokerOpt match { +case Some(messageSetPerBroker) = + val failedTopicPartitions = send(brokerid, messageSetPerBroker) + failedTopicPartitions.foreach(topicPartition = { +messagesPerBrokerMap.get(topicPartition) match { + case Some(data) = failedProduceRequests.appendAll(data) + case None = // nothing +} + }) +case None = // failed to group messages + messagesPerBrokerMap.values.foreach(m = failedProduceRequests.appendAll(m)) } -} catch { - case t: Throwable = error(Failed to send messages, t) } failedProduceRequests - case None = // all produce requests failed + case None = // failed to collate messages messages } } @@ -290,43 +291,46 @@ class DefaultEventHandler[K,V](config: ProducerConfig, } } - private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = { + private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K, Message]]]) = { /** enforce the compressed.topics config here. - * If the compression codec is anything other than NoCompressionCodec, - *Enable compression only for specified topics if any - *If the list of compressed topics is empty, then enable the specified compression codec for all topics - * If the compression codec is NoCompressionCodec, compression is disabled for all topics + * If the compression codec is anything other than NoCompressionCodec, + * Enable compression only for specified topics if any + * If the list of compressed topics is empty, then enable the specified compression codec for all topics + * If the compression codec is NoCompressionCodec, compression is disabled for all topics */ - -val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) = - val rawMessages = messages.map(_.message) - ( topicAndPartition, -config.compressionCodec match { - case NoCompressionCodec = -debug(Sending %d messages with no compression to %s.format(messages.size, topicAndPartition)) -new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*) - case _ = -config.compressedTopics.size match { - case 0 = -debug(Sending %d messages with compression codec %d to %s - .format(messages.size, config.compressionCodec.codec, topicAndPartition)) -new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*) - case _ = -if(config.compressedTopics.contains(topicAndPartition.topic)) { +try { +
[jira] [Commented] (KAFKA-1277) Keep the summery/description when updating the RB with kafka-patch-review
[ https://issues.apache.org/jira/browse/KAFKA-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168849#comment-14168849 ] Neha Narkhede commented on KAFKA-1277: -- [~omkreddy] Tried your latest patch. {noformat} nnarkhed-mn1:kafka nnarkhed$ python kafka-patch-review.py -j KAFKA-1277 -b trunk -d test Configuring reviewboard url to https://reviews.apache.org Updating your remote branches to pull the latest changes Usage: rbt post [options] [changenum] Uploads diffs to create and update review requests. rbt: error: no such option: -a ERROR: reviewboard update failed. Exiting. nnarkhed-mn1:kafka nnarkhed$ rbt --version RBTools 0.5.2 {noformat} Keep the summery/description when updating the RB with kafka-patch-review - Key: KAFKA-1277 URL: https://issues.apache.org/jira/browse/KAFKA-1277 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Manikumar Reddy Attachments: KAFKA-1277.patch, KAFKA-1277.patch, KAFKA-1277.patch, KAFKA-1277_2014-10-04_16:39:56.patch, KAFKA-1277_2014-10-04_16:51:20.patch, KAFKA-1277_2014-10-04_16:57:30.patch, KAFKA-1277_2014-10-04_17:00:37.patch, KAFKA-1277_2014-10-04_17:01:43.patch, KAFKA-1277_2014-10-04_17:03:08.patch, KAFKA-1277_2014-10-04_17:09:02.patch, KAFKA-1277_2014-10-05_11:04:33.patch, KAFKA-1277_2014-10-05_11:09:08.patch, KAFKA-1277_2014-10-05_11:10:50.patch, KAFKA-1277_2014-10-05_11:18:17.patch Today kafka-patch-review tool will always use a default title and description if they are not specified, even when updating an existing RB. Would better change to leave the current title/description as is. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 26633: Patch for KAFKA-1305
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26633/ --- Review request for kafka. Bugs: KAFKA-1305 https://issues.apache.org/jira/browse/KAFKA-1305 Repository: kafka Description --- KAFKA-1305. Controller can hang on controlled shutdown with auto leader balance enabled. Diffs - core/src/main/scala/kafka/controller/ControllerChannelManager.scala ecbfa0f328ba6a652a758ab20cacef324a8b2fb8 core/src/main/scala/kafka/server/KafkaConfig.scala 90af698b01ec82b6168e02b6af41887ef164ad51 system_test/mirror_maker_testsuite/config/server.properties c6284122e3dfaa046a453511c17a19a536dc1035 system_test/offset_management_testsuite/config/server.properties 2b988f86052a7bb248e1c7752808cb0a336d0020 system_test/offset_management_testsuite/testcase_7002/config/kafka_server_1.properties 41ec6e49272f129ab11a08aec04faf444f1c4f26 system_test/offset_management_testsuite/testcase_7002/config/kafka_server_2.properties 727e23701d6c2eb08c7ddde198e042921ef058f8 system_test/offset_management_testsuite/testcase_7002/config/kafka_server_3.properties e6fbbe1e0532eb393a5b100c8b3c58be4c546ec0 system_test/offset_management_testsuite/testcase_7002/config/kafka_server_4.properties fee65bce63564a7403d311be2c27c34e17e835a7 system_test/replication_testsuite/config/server.properties 6becbab60e3943e56a77dc7872febb9764b2dca9 Diff: https://reviews.apache.org/r/26633/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Commented] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled
[ https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168852#comment-14168852 ] Sriharsha Chintalapani commented on KAFKA-1305: --- Created reviewboard https://reviews.apache.org/r/26633/diff/ against branch origin/trunk Controller can hang on controlled shutdown with auto leader balance enabled --- Key: KAFKA-1305 URL: https://issues.apache.org/jira/browse/KAFKA-1305 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Priority: Blocker Fix For: 0.8.2, 0.9.0 Attachments: KAFKA-1305.patch, KAFKA-1305.patch This is relatively easy to reproduce especially when doing a rolling bounce. What happened here is as follows: 1. The previous controller was bounced and broker 265 became the new controller. 2. I went on to do a controlled shutdown of broker 265 (the new controller). 3. In the mean time the automatically scheduled preferred replica leader election process started doing its thing and starts sending LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers). (t@113 below). 4. While that's happening, the controlled shutdown process on 265 succeeds and proceeds to deregister itself from ZooKeeper and shuts down the socket server. 5. (ReplicaStateMachine actually removes deregistered brokers from the controller channel manager's list of brokers to send requests to. However, that removal cannot take place (t@18 below) because preferred replica leader election task owns the controller lock.) 6. So the request thread to broker 265 gets into infinite retries. 7. The entire broker shutdown process is blocked on controller shutdown for the same reason (it needs to acquire the controller lock). Relevant portions from the thread-dump: Controller-265-to-broker-265-send-thread - Thread t@113 java.lang.Thread.State: TIMED_WAITING at java.lang.Thread.sleep(Native Method) at kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:46) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) - locked java.lang.Object@6dbf14a7 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) Locked ownable synchronizers: - None ... Thread-4 - Thread t@17 java.lang.Thread.State: WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: kafka-scheduler-0 at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) at kafka.utils.Utils$.inLock(Utils.scala:536) at kafka.controller.KafkaController.shutdown(KafkaController.scala:642) at kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:46) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242) at kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46) at kafka.Kafka$$anon$1.run(Kafka.scala:42) ... kafka-scheduler-0 - Thread t@117 java.lang.Thread.State: WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at
[jira] [Updated] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled
[ https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sriharsha Chintalapani updated KAFKA-1305: -- Attachment: KAFKA-1305.patch Controller can hang on controlled shutdown with auto leader balance enabled --- Key: KAFKA-1305 URL: https://issues.apache.org/jira/browse/KAFKA-1305 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Priority: Blocker Fix For: 0.8.2, 0.9.0 Attachments: KAFKA-1305.patch, KAFKA-1305.patch This is relatively easy to reproduce especially when doing a rolling bounce. What happened here is as follows: 1. The previous controller was bounced and broker 265 became the new controller. 2. I went on to do a controlled shutdown of broker 265 (the new controller). 3. In the mean time the automatically scheduled preferred replica leader election process started doing its thing and starts sending LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers). (t@113 below). 4. While that's happening, the controlled shutdown process on 265 succeeds and proceeds to deregister itself from ZooKeeper and shuts down the socket server. 5. (ReplicaStateMachine actually removes deregistered brokers from the controller channel manager's list of brokers to send requests to. However, that removal cannot take place (t@18 below) because preferred replica leader election task owns the controller lock.) 6. So the request thread to broker 265 gets into infinite retries. 7. The entire broker shutdown process is blocked on controller shutdown for the same reason (it needs to acquire the controller lock). Relevant portions from the thread-dump: Controller-265-to-broker-265-send-thread - Thread t@113 java.lang.Thread.State: TIMED_WAITING at java.lang.Thread.sleep(Native Method) at kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:46) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) - locked java.lang.Object@6dbf14a7 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) Locked ownable synchronizers: - None ... Thread-4 - Thread t@17 java.lang.Thread.State: WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: kafka-scheduler-0 at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) at kafka.utils.Utils$.inLock(Utils.scala:536) at kafka.controller.KafkaController.shutdown(KafkaController.scala:642) at kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:46) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242) at kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46) at kafka.Kafka$$anon$1.run(Kafka.scala:42) ... kafka-scheduler-0 - Thread t@117 java.lang.Thread.State: WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) at kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57) -
Build failed in Jenkins: Kafka-trunk #299
See https://builds.apache.org/job/Kafka-trunk/299/changes Changes: [neha.narkhede] KAFKA-1471 Add producer unit tests for LZ4 and LZ4HC compression codecs; patched by James Oliver; reviewed by Neha Narkhede -- [...truncated 1529 lines...] kafka.admin.AdminTest testPartitionReassignmentNonOverlappingReplicas FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95) at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33) at kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33) at kafka.admin.AdminTest.setUp(AdminTest.scala:33) kafka.admin.AdminTest testReassigningNonExistingPartition FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95) at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33) at kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33) at kafka.admin.AdminTest.setUp(AdminTest.scala:33) kafka.admin.AdminTest testResumePartitionReassignmentThatWasCompleted FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95) at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33) at kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33) at kafka.admin.AdminTest.setUp(AdminTest.scala:33) kafka.admin.AdminTest testPreferredReplicaJsonData FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95) at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33) at kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33) at kafka.admin.AdminTest.setUp(AdminTest.scala:33) kafka.admin.AdminTest testBasicPreferredReplicaElection FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95) at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33) at kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33) at kafka.admin.AdminTest.setUp(AdminTest.scala:33) kafka.admin.AdminTest testShutdownBroker FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95) at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33) at kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33) at kafka.admin.AdminTest.setUp(AdminTest.scala:33) kafka.admin.AdminTest testTopicConfigChange FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168930#comment-14168930 ] Gwen Shapira commented on KAFKA-1555: - Awesome. So I'm updating here: http://svn.apache.org/repos/asf/kafka/site/ It looks like the right thing to do is to create a 082 subdir, copy the current documentation there and add the new reliability guarantees. We can then re-point the main doc link to that directory when we release. I think this is less confusing that modifying the docs after the release. Does that make sense? BTW. There are probably more stuff we need to document for 0.8.2 - perhaps we need to start filing doc jiras for the new version? The new producer will be very high on that list. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1702) Messages silently Lost by producer
[ https://issues.apache.org/jira/browse/KAFKA-1702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168929#comment-14168929 ] Jun Rao commented on KAFKA-1702: Thanks for the patch. Not sure why you want to wrap groupMessagesToSet in try/catch. I thought that's where the NoClassDefFoundError is thrown and you want to propagate it to the caller. With this patch, the producer in sync mode will get the exception. However, producer in async mode will still silently dropping messages. This is a general limitation and is being fixed in the new java producer through a callback. Messages silently Lost by producer -- Key: KAFKA-1702 URL: https://issues.apache.org/jira/browse/KAFKA-1702 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1 Reporter: Alexis Midon Assignee: Jun Rao Attachments: KAFKA-1702.0.patch Hello, we lost millions of messages because of this {{try/catch}} in the producer {{DefaultEventHandler}}: https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala#L114-L116 If a Throwable is caught by this {{try/catch}}, the retry policy will have no effect and all yet-to-be-sent messages are lost (the error will break the loop over the broker list). This issue is very hard to detect because: the producer (async or sync) cannot even catch the error, and *all* the metrics are updated as if everything was fine. Only the abnormal drop in the producers network I/O, or the incoming message rate on the brokers; or the alerting on errors in producer logs could have revealed the issue. This behavior was introduced by KAFKA-300. I can't see a good reason for it, so here is a patch that will let the retry-policy do its job when such a {{Throwable}} occurs. Thanks in advance for your help. Alexis ps: you might wonder how could this {{try/catch}} ever caught something? {{DefaultEventHandler#groupMessagesToSet}} looks so harmless. Here are the details: We use Snappy compression. When the native snappy library is not installed on the host, Snappy, during the initialization of class {{org.xerial.snappy.Snappy}} will [write a C library|https://github.com/xerial/snappy-java/blob/1.1.0/src/main/java/org/xerial/snappy/SnappyLoader.java#L312] in the JVM temp directory {{java.io.tmpdir}}. In our scenario, {{java.io.tmpdir}} was a subdirectory of {{/tmp}}. After an instance reboot (thank you [AWS|https://twitter.com/hashtag/AWSReboot?src=hash]!), the JVM temp directory was removed. The JVM was then running with a non-existing temp dir. Snappy class would be impossible to initialize and the following message would be silently logged: {code} ERROR [2014-10-07 22:23:56,530] kafka.producer.async.DefaultEventHandler: Failed to send messages ! java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled
[ https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168934#comment-14168934 ] Jun Rao commented on KAFKA-1305: Perhaps we can just set the default queue to max int for now. If we don't see any issue with this, we can make LinkedBlockingQueue to unbounded later. Could you also change the default value for auto.leader.rebalance.enable? Controller can hang on controlled shutdown with auto leader balance enabled --- Key: KAFKA-1305 URL: https://issues.apache.org/jira/browse/KAFKA-1305 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Priority: Blocker Fix For: 0.8.2, 0.9.0 Attachments: KAFKA-1305.patch, KAFKA-1305.patch This is relatively easy to reproduce especially when doing a rolling bounce. What happened here is as follows: 1. The previous controller was bounced and broker 265 became the new controller. 2. I went on to do a controlled shutdown of broker 265 (the new controller). 3. In the mean time the automatically scheduled preferred replica leader election process started doing its thing and starts sending LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers). (t@113 below). 4. While that's happening, the controlled shutdown process on 265 succeeds and proceeds to deregister itself from ZooKeeper and shuts down the socket server. 5. (ReplicaStateMachine actually removes deregistered brokers from the controller channel manager's list of brokers to send requests to. However, that removal cannot take place (t@18 below) because preferred replica leader election task owns the controller lock.) 6. So the request thread to broker 265 gets into infinite retries. 7. The entire broker shutdown process is blocked on controller shutdown for the same reason (it needs to acquire the controller lock). Relevant portions from the thread-dump: Controller-265-to-broker-265-send-thread - Thread t@113 java.lang.Thread.State: TIMED_WAITING at java.lang.Thread.sleep(Native Method) at kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:46) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) - locked java.lang.Object@6dbf14a7 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) Locked ownable synchronizers: - None ... Thread-4 - Thread t@17 java.lang.Thread.State: WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: kafka-scheduler-0 at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) at kafka.utils.Utils$.inLock(Utils.scala:536) at kafka.controller.KafkaController.shutdown(KafkaController.scala:642) at kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:46) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242) at kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46) at kafka.Kafka$$anon$1.run(Kafka.scala:42) ... kafka-scheduler-0 - Thread t@117 java.lang.Thread.State: WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at
[jira] [Commented] (KAFKA-1702) Messages silently Lost by producer
[ https://issues.apache.org/jira/browse/KAFKA-1702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168941#comment-14168941 ] Alexis Midon commented on KAFKA-1702: - I agree. In the async mode, until there is a callback, the best we can do is to make sure all the metrics are updated correctly, in particular ResendsPerSec, FailedSendsPerSec, which is critical for monitoring of async producers. In the sync mode, producer will get the exception, which is an improvement. thanks for your review Messages silently Lost by producer -- Key: KAFKA-1702 URL: https://issues.apache.org/jira/browse/KAFKA-1702 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1 Reporter: Alexis Midon Assignee: Jun Rao Attachments: KAFKA-1702.0.patch Hello, we lost millions of messages because of this {{try/catch}} in the producer {{DefaultEventHandler}}: https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala#L114-L116 If a Throwable is caught by this {{try/catch}}, the retry policy will have no effect and all yet-to-be-sent messages are lost (the error will break the loop over the broker list). This issue is very hard to detect because: the producer (async or sync) cannot even catch the error, and *all* the metrics are updated as if everything was fine. Only the abnormal drop in the producers network I/O, or the incoming message rate on the brokers; or the alerting on errors in producer logs could have revealed the issue. This behavior was introduced by KAFKA-300. I can't see a good reason for it, so here is a patch that will let the retry-policy do its job when such a {{Throwable}} occurs. Thanks in advance for your help. Alexis ps: you might wonder how could this {{try/catch}} ever caught something? {{DefaultEventHandler#groupMessagesToSet}} looks so harmless. Here are the details: We use Snappy compression. When the native snappy library is not installed on the host, Snappy, during the initialization of class {{org.xerial.snappy.Snappy}} will [write a C library|https://github.com/xerial/snappy-java/blob/1.1.0/src/main/java/org/xerial/snappy/SnappyLoader.java#L312] in the JVM temp directory {{java.io.tmpdir}}. In our scenario, {{java.io.tmpdir}} was a subdirectory of {{/tmp}}. After an instance reboot (thank you [AWS|https://twitter.com/hashtag/AWSReboot?src=hash]!), the JVM temp directory was removed. The JVM was then running with a non-existing temp dir. Snappy class would be impossible to initialize and the following message would be silently logged: {code} ERROR [2014-10-07 22:23:56,530] kafka.producer.async.DefaultEventHandler: Failed to send messages ! java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1702) Messages silently Lost by producer
[ https://issues.apache.org/jira/browse/KAFKA-1702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168942#comment-14168942 ] Alexis Midon commented on KAFKA-1702: - Also if #groupMessagesToSet is not in a try/catch, the error will break the loop on the broker list. All messages will get dropped, retries ignored, metrics won't get updated, etc. Messages silently Lost by producer -- Key: KAFKA-1702 URL: https://issues.apache.org/jira/browse/KAFKA-1702 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1 Reporter: Alexis Midon Assignee: Jun Rao Attachments: KAFKA-1702.0.patch Hello, we lost millions of messages because of this {{try/catch}} in the producer {{DefaultEventHandler}}: https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala#L114-L116 If a Throwable is caught by this {{try/catch}}, the retry policy will have no effect and all yet-to-be-sent messages are lost (the error will break the loop over the broker list). This issue is very hard to detect because: the producer (async or sync) cannot even catch the error, and *all* the metrics are updated as if everything was fine. Only the abnormal drop in the producers network I/O, or the incoming message rate on the brokers; or the alerting on errors in producer logs could have revealed the issue. This behavior was introduced by KAFKA-300. I can't see a good reason for it, so here is a patch that will let the retry-policy do its job when such a {{Throwable}} occurs. Thanks in advance for your help. Alexis ps: you might wonder how could this {{try/catch}} ever caught something? {{DefaultEventHandler#groupMessagesToSet}} looks so harmless. Here are the details: We use Snappy compression. When the native snappy library is not installed on the host, Snappy, during the initialization of class {{org.xerial.snappy.Snappy}} will [write a C library|https://github.com/xerial/snappy-java/blob/1.1.0/src/main/java/org/xerial/snappy/SnappyLoader.java#L312] in the JVM temp directory {{java.io.tmpdir}}. In our scenario, {{java.io.tmpdir}} was a subdirectory of {{/tmp}}. After an instance reboot (thank you [AWS|https://twitter.com/hashtag/AWSReboot?src=hash]!), the JVM temp directory was removed. The JVM was then running with a non-existing temp dir. Snappy class would be impossible to initialize and the following message would be silently logged: {code} ERROR [2014-10-07 22:23:56,530] kafka.producer.async.DefaultEventHandler: Failed to send messages ! java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)