[jira] [Updated] (KAFKA-589) Clean shutdown after startup connection failure
[ https://issues.apache.org/jira/browse/KAFKA-589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-589: Status: Patch Available (was: Open) This patch makes KafkaServer clean up after itself, but still rethrow any caught exceptions. This keeps the existing interface the same, should still work if the caller does cleanup themselves by catching exceptions and calling shutdown, but also cleans up if they don't so the leftover thread won't cause a hang. Also adds a test of this behavior. Clean shutdown after startup connection failure --- Key: KAFKA-589 URL: https://issues.apache.org/jira/browse/KAFKA-589 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.0, 0.7.2 Reporter: Jason Rosenberg Assignee: Swapnil Ghike Priority: Minor Labels: bugs, newbie Hi, I'm embedding the kafka server (0.7.2) in an application container. I've noticed that if I try to start the server without zookeeper being available, by default it gets a zk connection timeout after 6 seconds, and then throws an Exception out of KafkaServer.startup()E.g., I see this stack trace: Exception in thread main org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 6000 at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876) at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:98) at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:84) at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44) at kafka.log.LogManager.init(LogManager.scala:93) at kafka.server.KafkaServer.startup(KafkaServer.scala:58) So that's ok, I can catch the exception, and then shut everything down gracefully, in this case. However, when I do this, it seems there is a daemon thread still around, which doesn't quit, and so the server never actually exits the jvm. Specifically, this thread seems to hang around: kafka-logcleaner-0 prio=5 tid=7fd9b48b1000 nid=0x112c08000 waiting on condition [112c07000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 7f40d4be8 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) at java.util.concurrent.DelayQueue.take(DelayQueue.java:164) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) at java.lang.Thread.run(Thread.java:680) Looking at the code in kafka.log.LogManager(), it does seem like it starts up the scheduler to clean logs, before then trying to connect to zk (and in this case fail): /* Schedule the cleanup task to delete old logs */ if(scheduler != null) { info(starting log cleaner every + logCleanupIntervalMs + ms) scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs) } So this scheduler does not appear to be stopped if startup fails. However, if I catch the above RuntimeException, and then call KafkaServer.shutdown(), then it will stop the scheduler, and all is good. However, it seems odd that if I get an exception when calling KafkaServer.startup(), that I should still have to do a KafkaServer.shutdown(). Rather, wouldn't it be better to have it internally cleanup after itself if startup() gets an exception? I'm not sure I can reliably call shutdown() after a failed startup() -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-589) Clean shutdown after startup connection failure
[ https://issues.apache.org/jira/browse/KAFKA-589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-589: Attachment: KAFKA-589-v1.patch Clean shutdown after startup connection failure --- Key: KAFKA-589 URL: https://issues.apache.org/jira/browse/KAFKA-589 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7.2, 0.8.0 Reporter: Jason Rosenberg Assignee: Swapnil Ghike Priority: Minor Labels: bugs, newbie Attachments: KAFKA-589-v1.patch Hi, I'm embedding the kafka server (0.7.2) in an application container. I've noticed that if I try to start the server without zookeeper being available, by default it gets a zk connection timeout after 6 seconds, and then throws an Exception out of KafkaServer.startup()E.g., I see this stack trace: Exception in thread main org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 6000 at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876) at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:98) at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:84) at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44) at kafka.log.LogManager.init(LogManager.scala:93) at kafka.server.KafkaServer.startup(KafkaServer.scala:58) So that's ok, I can catch the exception, and then shut everything down gracefully, in this case. However, when I do this, it seems there is a daemon thread still around, which doesn't quit, and so the server never actually exits the jvm. Specifically, this thread seems to hang around: kafka-logcleaner-0 prio=5 tid=7fd9b48b1000 nid=0x112c08000 waiting on condition [112c07000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 7f40d4be8 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) at java.util.concurrent.DelayQueue.take(DelayQueue.java:164) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) at java.lang.Thread.run(Thread.java:680) Looking at the code in kafka.log.LogManager(), it does seem like it starts up the scheduler to clean logs, before then trying to connect to zk (and in this case fail): /* Schedule the cleanup task to delete old logs */ if(scheduler != null) { info(starting log cleaner every + logCleanupIntervalMs + ms) scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs) } So this scheduler does not appear to be stopped if startup fails. However, if I catch the above RuntimeException, and then call KafkaServer.shutdown(), then it will stop the scheduler, and all is good. However, it seems odd that if I get an exception when calling KafkaServer.startup(), that I should still have to do a KafkaServer.shutdown(). Rather, wouldn't it be better to have it internally cleanup after itself if startup() gets an exception? I'm not sure I can reliably call shutdown() after a failed startup() -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1631) ReplicationFactor and under-replicated partitions incorrect during reassignment
[ https://issues.apache.org/jira/browse/KAFKA-1631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1631: - Attachment: KAFKA-1631-v1.patch This patch fixes things in a way that I think [~rberdeen] would find sensible. The issue was that the test for under-replication was comparing the current set of assigned replicas against the number of in sync replicas. But during the reassignment that isn't really correct because if you, e.g., move all replicas to different brokers then you'll have more than the target # of partitions. The fix is to look up the active set of reassignments and, if one is active for the partition, use that reassignment info to determine the correct # of replicas; otherwise, we can fall back on the active set. Note that this does mean that reassignments that increase the number of replicas will show up as under-replicated, which I think may be the case [~nehanarkhede] was hoping to fix. It's arguable which approach is correct (i.e. should the new target # of replicas apply as soon as the reassignment is issued or once it's completed). As for the replication factor being reported -- that is the number of currently assigned replicas for the first partition and has a number of issues. 1. It can be higher than the real target number of replicas as described above. 2. It's also not really correct to have it on the topic summary line since it varies by partition. 3. Finally, it's not even just the value for partition 0 because it's just using the head of a Map. If we're ok with changing the output formatting here, I can clean that part up as well, maybe by adding ReplicationFactor to each partition line and making it use the value used when determining under-replication. ReplicationFactor and under-replicated partitions incorrect during reassignment --- Key: KAFKA-1631 URL: https://issues.apache.org/jira/browse/KAFKA-1631 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie Attachments: KAFKA-1631-v1.patch We have a topic with a replication factor of 3. We monitor UnderReplicatedPartitions as recommended by the documentation. During a partition reassignment, partitions being reassigned are reported as under-replicated. Running a describe shows: {code} Topic:activity-wal-1PartitionCount:15 ReplicationFactor:5 Configs: Topic: activity-wal-1 Partition: 0Leader: 14 Replicas: 14,13,12,11,15Isr: 14,12,11,13 Topic: activity-wal-1 Partition: 1Leader: 14 Replicas: 15,14,11 Isr: 14,11 Topic: activity-wal-1 Partition: 2Leader: 11 Replicas: 11,15,12 Isr: 12,11,15 ... {code} It looks like the displayed replication factor, 5, is simply the number of replicas listed for the first partition, which includes both brokers in the current list and those onto which the partition is being reassigned. Partition 0 is also included in the list when using the `--under-replicated-partitions` option, even though it is replicated to more partitions than the true replication factor. During a reassignment, the under-replicated partitions metric is not usable, meaning that actual under-replicated partitions can go unnoticed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1631) ReplicationFactor and under-replicated partitions incorrect during reassignment
[ https://issues.apache.org/jira/browse/KAFKA-1631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148680#comment-14148680 ] Ewen Cheslack-Postava commented on KAFKA-1631: -- Right. Unfortunately most of the system isn't aware of the large scale change (reassign old set - new set), only of each intermediate state (old set - old set + new set - new set). As it stands, the UnderReplicatedPartitions are computed by Partition class, which is created by ReplicaManager. But the high-level reassignment is managed by KafkaController, and looks like the only place the necessary state is maintained. I think getting the semantics you want may require a much more substantial change since each partition leader will need to know about the partition reassignment rather than just the controller. On the other hand, while I think it's less than ideal, the current behavior could certainly be argued to be reasonable -- i.e. that reassignment is not natively supported, it's just a higher-level operation you can build up. In this case, the intermediate step is expected, and the temporary reporting of under-replication would make sense since for a time the desired replication of (old set + new set) has not been achieved. ReplicationFactor and under-replicated partitions incorrect during reassignment --- Key: KAFKA-1631 URL: https://issues.apache.org/jira/browse/KAFKA-1631 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie Attachments: KAFKA-1631-v1.patch We have a topic with a replication factor of 3. We monitor UnderReplicatedPartitions as recommended by the documentation. During a partition reassignment, partitions being reassigned are reported as under-replicated. Running a describe shows: {code} Topic:activity-wal-1PartitionCount:15 ReplicationFactor:5 Configs: Topic: activity-wal-1 Partition: 0Leader: 14 Replicas: 14,13,12,11,15Isr: 14,12,11,13 Topic: activity-wal-1 Partition: 1Leader: 14 Replicas: 15,14,11 Isr: 14,11 Topic: activity-wal-1 Partition: 2Leader: 11 Replicas: 11,15,12 Isr: 12,11,15 ... {code} It looks like the displayed replication factor, 5, is simply the number of replicas listed for the first partition, which includes both brokers in the current list and those onto which the partition is being reassigned. Partition 0 is also included in the list when using the `--under-replicated-partitions` option, even though it is replicated to more partitions than the true replication factor. During a reassignment, the under-replicated partitions metric is not usable, meaning that actual under-replicated partitions can go unnoticed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 26563: Patch for KAFKA-1692
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26563/ --- Review request for kafka. Bugs: KAFKA-1692 https://issues.apache.org/jira/browse/KAFKA-1692 Repository: kafka Description --- KAFKA-1692 Include client ID in new producer IO thread name. Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java f58b8508d3f813a51015abed772c704390887d7e Diff: https://reviews.apache.org/r/26563/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1692) [Java New Producer] IO Thread Name Must include Client ID
[ https://issues.apache.org/jira/browse/KAFKA-1692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1692: - Assignee: Ewen Cheslack-Postava (was: Jun Rao) Status: Patch Available (was: Open) [Java New Producer] IO Thread Name Must include Client ID --- Key: KAFKA-1692 URL: https://issues.apache.org/jira/browse/KAFKA-1692 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Trivial Labels: newbie Attachments: KAFKA-1692.patch Please add client id so people who are looking at Jconsole or Profile tool can see Thread by client id since single JVM can have multiple producer instance. org.apache.kafka.clients.producer.KafkaProducer {code} String ioThreadName = kafka-producer-network-thread; if(clientId != null){ ioThreadName = ioThreadName + | +clientId; } this.ioThread = new KafkaThread(ioThreadName, this.sender, true); {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1692) [Java New Producer] IO Thread Name Must include Client ID
[ https://issues.apache.org/jira/browse/KAFKA-1692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1692: - Attachment: KAFKA-1692.patch [Java New Producer] IO Thread Name Must include Client ID --- Key: KAFKA-1692 URL: https://issues.apache.org/jira/browse/KAFKA-1692 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Jun Rao Priority: Trivial Labels: newbie Attachments: KAFKA-1692.patch Please add client id so people who are looking at Jconsole or Profile tool can see Thread by client id since single JVM can have multiple producer instance. org.apache.kafka.clients.producer.KafkaProducer {code} String ioThreadName = kafka-producer-network-thread; if(clientId != null){ ioThreadName = ioThreadName + | +clientId; } this.ioThread = new KafkaThread(ioThreadName, this.sender, true); {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1692) [Java New Producer] IO Thread Name Must include Client ID
[ https://issues.apache.org/jira/browse/KAFKA-1692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14167169#comment-14167169 ] Ewen Cheslack-Postava commented on KAFKA-1692: -- Created reviewboard https://reviews.apache.org/r/26563/diff/ against branch origin/trunk [Java New Producer] IO Thread Name Must include Client ID --- Key: KAFKA-1692 URL: https://issues.apache.org/jira/browse/KAFKA-1692 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Jun Rao Priority: Trivial Labels: newbie Attachments: KAFKA-1692.patch Please add client id so people who are looking at Jconsole or Profile tool can see Thread by client id since single JVM can have multiple producer instance. org.apache.kafka.clients.producer.KafkaProducer {code} String ioThreadName = kafka-producer-network-thread; if(clientId != null){ ioThreadName = ioThreadName + | +clientId; } this.ioThread = new KafkaThread(ioThreadName, this.sender, true); {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 26564: Patch for KAFKA-1471
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26564/ --- Review request for kafka. Bugs: KAFKA-1471 https://issues.apache.org/jira/browse/KAFKA-1471 Repository: kafka Description --- KAFKA-1471 Add producer unit tests for LZ4 and LZ4HC compression codecs Diffs - clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 79d57f9bf31606ffa5400f2f12356eba84703cc2 core/src/main/scala/kafka/tools/ConsoleProducer.scala 8e9ba0b284671989f87d9c421bc98f5c4384c260 core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 17e2c6e9dfd789acb4b6db37c780c862667e4e11 Diff: https://reviews.apache.org/r/26564/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1471) Add Producer Unit Tests for LZ4 and LZ4HC compression
[ https://issues.apache.org/jira/browse/KAFKA-1471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1471: - Attachment: KAFKA-1471.patch Add Producer Unit Tests for LZ4 and LZ4HC compression - Key: KAFKA-1471 URL: https://issues.apache.org/jira/browse/KAFKA-1471 Project: Kafka Issue Type: Sub-task Reporter: James Oliver Fix For: 0.8.2 Attachments: KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1471) Add Producer Unit Tests for LZ4 and LZ4HC compression
[ https://issues.apache.org/jira/browse/KAFKA-1471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14167178#comment-14167178 ] Ewen Cheslack-Postava commented on KAFKA-1471: -- Created reviewboard https://reviews.apache.org/r/26564/diff/ against branch origin/trunk Add Producer Unit Tests for LZ4 and LZ4HC compression - Key: KAFKA-1471 URL: https://issues.apache.org/jira/browse/KAFKA-1471 Project: Kafka Issue Type: Sub-task Reporter: James Oliver Fix For: 0.8.2 Attachments: KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1471) Add Producer Unit Tests for LZ4 and LZ4HC compression
[ https://issues.apache.org/jira/browse/KAFKA-1471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14167183#comment-14167183 ] Ewen Cheslack-Postava commented on KAFKA-1471: -- This updated version applies cleanly (just had a bit of fuzz), removes a few unnecessarily changed lines, and fixes some typos. Add Producer Unit Tests for LZ4 and LZ4HC compression - Key: KAFKA-1471 URL: https://issues.apache.org/jira/browse/KAFKA-1471 Project: Kafka Issue Type: Sub-task Reporter: James Oliver Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 26566: Patch for KAFKA-1680
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26566/ --- Review request for kafka. Bugs: KAFKA-1680 https://issues.apache.org/jira/browse/KAFKA-1680 Repository: kafka Description --- KAFKA-1680 Standardize command line argument parsing and usage messages. At it's heart, this was just a test of args.length that was invalid for this command, but 6b0ae4bba0d introduced the same potential issue across all the command line tools. This standardizes all the command line tools on a cleaner parsing pattern by pushing most of the work into CommandLineUtils and printing usage info for any type of parsing exception. Ideally the long term solution would be to use a newer version of joptsimple that allows us to express constraints on arguments to get almost all command line option issues resolved at parse time. Diffs - core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala c7918483c02040a7cc18d6e9edbd20a3025a3a55 core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 691d69a49a240f38883d2025afaec26fd61281b5 core/src/main/scala/kafka/admin/TopicCommand.scala 7672c5aab4fba8c23b1bb5cd4785c332d300a3fa core/src/main/scala/kafka/tools/ConsoleConsumer.scala 323fc8566d974acc4e5c7d7c2a065794f3b5df4a core/src/main/scala/kafka/tools/ConsoleProducer.scala 8e9ba0b284671989f87d9c421bc98f5c4384c260 core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala d1e7c434e77859d746b8dc68dd5d5a3740425e79 core/src/main/scala/kafka/tools/ConsumerPerformance.scala 093c800ea7f8a9c972bb66e99ac4e4d431cf11cc core/src/main/scala/kafka/tools/DumpLogSegments.scala 8e9d47b8d4adc5754ed8861aa04ddd3c6b629e3d core/src/main/scala/kafka/tools/ExportZkOffsets.scala 4d051bc2db12f0e15aa6a3289abeb9dd25d373d2 core/src/main/scala/kafka/tools/GetOffsetShell.scala 3d9293e4abbe3f4a4a2bc5833385747c604d5a95 core/src/main/scala/kafka/tools/ImportZkOffsets.scala abe09721b13f71320510fd1a01c1917470450c6e core/src/main/scala/kafka/tools/JmxTool.scala 1d1a120c45ff70fbd60df5b147ca230eb1ef50de core/src/main/scala/kafka/tools/MirrorMaker.scala b8698ee1469c8fbc92ccc176d916eb3e28b87867 core/src/main/scala/kafka/tools/ProducerPerformance.scala f61c7c701fd85caabc2d2950a7b02aa85e5cdfe3 core/src/main/scala/kafka/tools/ReplayLogProducer.scala 3393a3dd574ac45a27bf7eda646b737146c55038 core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala ba6ddd7a909df79a0f7d45e8b4a2af94ea0fceb6 core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala 7602b8d705970a5dab49ed36d117346a960701ac core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b4f903b6c7c3bb725cac7c05eb1f885906413c4d core/src/main/scala/kafka/tools/StateChangeLogMerger.scala d298e7e81acc7427c6cf4796b445966267ca54eb core/src/main/scala/kafka/tools/TestLogCleaning.scala 1d4ea93f2ba8d4d4d47a307cd47f54a15d3d30dd core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala aef8361b73a0934641fc4f5cee942b5b50f3e7d7 core/src/main/scala/kafka/utils/CommandLineUtils.scala 086a62483fad0c9cfc7004ff94c890cfb9929fa6 core/src/main/scala/kafka/utils/ToolsUtils.scala fef93929ea03e181f87fe294c06d9bc9fc823e9e core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala 7211c2529c1db76100432737da7a1d1d221dfba0 Diff: https://reviews.apache.org/r/26566/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1680) JmxTool exits if no arguments are given
[ https://issues.apache.org/jira/browse/KAFKA-1680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1680: - Attachment: KAFKA-1680.patch JmxTool exits if no arguments are given --- Key: KAFKA-1680 URL: https://issues.apache.org/jira/browse/KAFKA-1680 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Ryan Berdeen Priority: Minor Attachments: KAFKA-1680.patch JmxTool has no required arguments, but it exits if no arguments are provided. You can work around this by passing a non-option argument, which will be ignored, e.g.{{./bin/kafka-run-class.sh kafka.tools.JmxTool xxx}}. It looks like this was broken in KAFKA-1291 / 6b0ae4bba0d0f8e4c8da19de65a8f03f162bec39 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1680) JmxTool exits if no arguments are given
[ https://issues.apache.org/jira/browse/KAFKA-1680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1680: - Assignee: Ewen Cheslack-Postava Status: Patch Available (was: Open) JmxTool exits if no arguments are given --- Key: KAFKA-1680 URL: https://issues.apache.org/jira/browse/KAFKA-1680 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Ryan Berdeen Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: KAFKA-1680.patch JmxTool has no required arguments, but it exits if no arguments are provided. You can work around this by passing a non-option argument, which will be ignored, e.g.{{./bin/kafka-run-class.sh kafka.tools.JmxTool xxx}}. It looks like this was broken in KAFKA-1291 / 6b0ae4bba0d0f8e4c8da19de65a8f03f162bec39 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1680) JmxTool exits if no arguments are given
[ https://issues.apache.org/jira/browse/KAFKA-1680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14167228#comment-14167228 ] Ewen Cheslack-Postava commented on KAFKA-1680: -- Created reviewboard https://reviews.apache.org/r/26566/diff/ against branch origin/trunk JmxTool exits if no arguments are given --- Key: KAFKA-1680 URL: https://issues.apache.org/jira/browse/KAFKA-1680 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Ryan Berdeen Priority: Minor Attachments: KAFKA-1680.patch JmxTool has no required arguments, but it exits if no arguments are provided. You can work around this by passing a non-option argument, which will be ignored, e.g.{{./bin/kafka-run-class.sh kafka.tools.JmxTool xxx}}. It looks like this was broken in KAFKA-1291 / 6b0ae4bba0d0f8e4c8da19de65a8f03f162bec39 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1471) Add Producer Unit Tests for LZ4 and LZ4HC compression
[ https://issues.apache.org/jira/browse/KAFKA-1471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168834#comment-14168834 ] Ewen Cheslack-Postava commented on KAFKA-1471: -- Updated reviewboard https://reviews.apache.org/r/26564/diff/ against branch origin/trunk Add Producer Unit Tests for LZ4 and LZ4HC compression - Key: KAFKA-1471 URL: https://issues.apache.org/jira/browse/KAFKA-1471 Project: Kafka Issue Type: Sub-task Reporter: James Oliver Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471_2014-10-12_16:02:19.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26564: Patch for KAFKA-1471
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26564/ --- (Updated Oct. 12, 2014, 11:02 p.m.) Review request for kafka. Bugs: KAFKA-1471 https://issues.apache.org/jira/browse/KAFKA-1471 Repository: kafka Description (updated) --- KAFKA-1471 Add producer unit tests for LZ4 and LZ4HC compression codecs; patched by James Oliver Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 79d57f9bf31606ffa5400f2f12356eba84703cc2 core/src/main/scala/kafka/tools/ConsoleProducer.scala 8e9ba0b284671989f87d9c421bc98f5c4384c260 core/src/main/scala/kafka/tools/PerfConfig.scala 129cc013f68d2b89bdfea74a1d9ee26a011791f2 core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 17e2c6e9dfd789acb4b6db37c780c862667e4e11 core/src/test/scala/unit/kafka/message/MessageTest.scala 4837585d03535043a6f25938368988128df9b94a Diff: https://reviews.apache.org/r/26564/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1471) Add Producer Unit Tests for LZ4 and LZ4HC compression
[ https://issues.apache.org/jira/browse/KAFKA-1471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1471: - Attachment: KAFKA-1471_2014-10-12_16:02:19.patch Add Producer Unit Tests for LZ4 and LZ4HC compression - Key: KAFKA-1471 URL: https://issues.apache.org/jira/browse/KAFKA-1471 Project: Kafka Issue Type: Sub-task Reporter: James Oliver Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471_2014-10-12_16:02:19.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-979) Add jitter for time based rolling
[ https://issues.apache.org/jira/browse/KAFKA-979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-979: Assignee: Ewen Cheslack-Postava (was: Sriram Subramanian) Status: Patch Available (was: Open) Add jitter for time based rolling - Key: KAFKA-979 URL: https://issues.apache.org/jira/browse/KAFKA-979 Project: Kafka Issue Type: Bug Reporter: Sriram Subramanian Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-979.patch Currently, for low volume topics time based rolling happens at the same time. This causes a lot of IO on a typical cluster and creates back pressure. We need to add a jitter to prevent them from happening at the same time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-979) Add jitter for time based rolling
[ https://issues.apache.org/jira/browse/KAFKA-979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-979: Attachment: KAFKA-979.patch Add jitter for time based rolling - Key: KAFKA-979 URL: https://issues.apache.org/jira/browse/KAFKA-979 Project: Kafka Issue Type: Bug Reporter: Sriram Subramanian Assignee: Sriram Subramanian Labels: newbie Attachments: KAFKA-979.patch Currently, for low volume topics time based rolling happens at the same time. This causes a lot of IO on a typical cluster and creates back pressure. We need to add a jitter to prevent them from happening at the same time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 26663: Patch for KAFKA-979
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26663/ --- Review request for kafka. Bugs: KAFKA-979 https://issues.apache.org/jira/browse/KAFKA-979 Repository: kafka Description --- KAFKA-979 Add optional random jitter for time based log rolling. Diffs - core/src/main/scala/kafka/log/Log.scala a123cdc52f341a802b3e4bfeb29a6154332e5f73 core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff core/src/main/scala/kafka/log/LogConfig.scala d2cc9e3d6b7a4fd24516d164eb3673e6ce052129 core/src/main/scala/kafka/log/LogSegment.scala 7597d309f37a0b3756381f9500100ef763d466ba core/src/main/scala/kafka/server/KafkaConfig.scala 7fcbc16da898623b03659c803e2a20c7d1bd1011 core/src/main/scala/kafka/server/KafkaServer.scala 3e9e91f2b456bbdeb3055d571e18ffea8675b4bf core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 7b97e6a80753a770ac094e101c653193dec67e68 core/src/test/scala/unit/kafka/log/LogTest.scala a0cbd3bbbeeabae12caa6b41aec31a8f5dfd034b Diff: https://reviews.apache.org/r/26663/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Commented] (KAFKA-979) Add jitter for time based rolling
[ https://issues.apache.org/jira/browse/KAFKA-979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170196#comment-14170196 ] Ewen Cheslack-Postava commented on KAFKA-979: - Created reviewboard https://reviews.apache.org/r/26663/diff/ against branch origin/trunk Add jitter for time based rolling - Key: KAFKA-979 URL: https://issues.apache.org/jira/browse/KAFKA-979 Project: Kafka Issue Type: Bug Reporter: Sriram Subramanian Assignee: Sriram Subramanian Labels: newbie Attachments: KAFKA-979.patch Currently, for low volume topics time based rolling happens at the same time. This causes a lot of IO on a typical cluster and creates back pressure. We need to add a jitter to prevent them from happening at the same time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 26666: Patch for KAFKA-1653
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/2/ --- Review request for kafka. Bugs: KAFKA-1653 https://issues.apache.org/jira/browse/KAFKA-1653 Repository: kafka Description --- KAFKA-1653 Disallow duplicate broker IDs in user input for admin commands. This covers a few cases besides the one identified in the bug. Aside from a major refactoring to use Sets for broker/replica lists, sanitizing user input seems to be the best solution here. I chose to generate errors instead of just using toSet since a duplicate entry may indicate that a different broker id was accidentally omitted. Diffs - core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 691d69a49a240f38883d2025afaec26fd61281b5 core/src/main/scala/kafka/admin/TopicCommand.scala 7672c5aab4fba8c23b1bb5cd4785c332d300a3fa core/src/main/scala/kafka/tools/StateChangeLogMerger.scala d298e7e81acc7427c6cf4796b445966267ca54eb Diff: https://reviews.apache.org/r/2/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Commented] (KAFKA-1653) Duplicate broker ids allowed in replica assignment
[ https://issues.apache.org/jira/browse/KAFKA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170264#comment-14170264 ] Ewen Cheslack-Postava commented on KAFKA-1653: -- Created reviewboard https://reviews.apache.org/r/2/diff/ against branch origin/trunk Duplicate broker ids allowed in replica assignment -- Key: KAFKA-1653 URL: https://issues.apache.org/jira/browse/KAFKA-1653 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie Attachments: KAFKA-1653.patch The reassign partitions command and the controller do not ensure that all replicas for a partition are on different brokers. For example, you could set 1,2,2 as the list of brokers for the replicas. kafka-topics.sh --describe --under-replicated will list these partitions as under-replicated, but I can't see a reason why the controller should allow this state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 26667: Patch for KAFKA-1698
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26667/ --- Review request for kafka. Bugs: KAFKA-1698 https://issues.apache.org/jira/browse/KAFKA-1698 Repository: kafka Description --- KAFKA-1698 Validate parsed ConfigDef values in addition to the default values. Diffs - clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 227309e8c62f9fc435722f28f2deff4a48e30853 clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java 09a82feeb7cae95209e54d3554224915a1498ebd Diff: https://reviews.apache.org/r/26667/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1698) Validator.ensureValid() only validates default config value
[ https://issues.apache.org/jira/browse/KAFKA-1698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1698: - Assignee: Ewen Cheslack-Postava Status: Patch Available (was: Open) Validator.ensureValid() only validates default config value --- Key: KAFKA-1698 URL: https://issues.apache.org/jira/browse/KAFKA-1698 Project: Kafka Issue Type: Bug Components: core Reporter: Jun Rao Assignee: Ewen Cheslack-Postava Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1698.patch We should use it to validate the actual configured value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1698) Validator.ensureValid() only validates default config value
[ https://issues.apache.org/jira/browse/KAFKA-1698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1698: - Attachment: KAFKA-1698.patch Validator.ensureValid() only validates default config value --- Key: KAFKA-1698 URL: https://issues.apache.org/jira/browse/KAFKA-1698 Project: Kafka Issue Type: Bug Components: core Reporter: Jun Rao Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1698.patch We should use it to validate the actual configured value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1698) Validator.ensureValid() only validates default config value
[ https://issues.apache.org/jira/browse/KAFKA-1698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170280#comment-14170280 ] Ewen Cheslack-Postava commented on KAFKA-1698: -- Created reviewboard https://reviews.apache.org/r/26667/diff/ against branch origin/trunk Validator.ensureValid() only validates default config value --- Key: KAFKA-1698 URL: https://issues.apache.org/jira/browse/KAFKA-1698 Project: Kafka Issue Type: Bug Components: core Reporter: Jun Rao Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1698.patch We should use it to validate the actual configured value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 26710: Patch for KAFKA-1637
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26710/ --- Review request for kafka. Bugs: KAFKA-1637 https://issues.apache.org/jira/browse/KAFKA-1637 Repository: kafka Description --- KAFKA-1637 Return correct error code and offsets for OffsetFetchRequest for unknown topics/partitions vs no associated consumer. Diffs - core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 1586243d20d6a181a1bd9f07e1c9493596005b32 core/src/main/scala/kafka/server/KafkaApis.scala 67f2833804cb15976680e42b9dc49e275c89d266 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 2d9325045ac1ac2d7531161b32c98c847125cbf0 Diff: https://reviews.apache.org/r/26710/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1637) SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group
[ https://issues.apache.org/jira/browse/KAFKA-1637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1637: - Assignee: Ewen Cheslack-Postava Status: Patch Available (was: Open) SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group Key: KAFKA-1637 URL: https://issues.apache.org/jira/browse/KAFKA-1637 Project: Kafka Issue Type: Bug Components: consumer, core Affects Versions: 0.8.1, 0.8.1.1 Environment: Linux Reporter: Amir Malekpour Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-1637.patch This concerns Kafka's Offset Fetch API: According to Kafka's current documentation, if there is no offset associated with a topic-partition under that consumer group the broker does not set an error code (since it is not really an error), but returns empty metadata and sets the offset field to -1. (Link below) However, in Kafka 08.1.1 Error code '3' is returned, which effectively makes it impossible for the client to decide if there was an error, or if there is no offset associated with a topic-partition under that consumer group. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1637) SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group
[ https://issues.apache.org/jira/browse/KAFKA-1637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1637: - Attachment: KAFKA-1637.patch SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group Key: KAFKA-1637 URL: https://issues.apache.org/jira/browse/KAFKA-1637 Project: Kafka Issue Type: Bug Components: consumer, core Affects Versions: 0.8.1, 0.8.1.1 Environment: Linux Reporter: Amir Malekpour Labels: newbie Attachments: KAFKA-1637.patch This concerns Kafka's Offset Fetch API: According to Kafka's current documentation, if there is no offset associated with a topic-partition under that consumer group the broker does not set an error code (since it is not really an error), but returns empty metadata and sets the offset field to -1. (Link below) However, in Kafka 08.1.1 Error code '3' is returned, which effectively makes it impossible for the client to decide if there was an error, or if there is no offset associated with a topic-partition under that consumer group. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1637) SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group
[ https://issues.apache.org/jira/browse/KAFKA-1637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14171610#comment-14171610 ] Ewen Cheslack-Postava commented on KAFKA-1637: -- Created reviewboard https://reviews.apache.org/r/26710/diff/ against branch origin/trunk SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group Key: KAFKA-1637 URL: https://issues.apache.org/jira/browse/KAFKA-1637 Project: Kafka Issue Type: Bug Components: consumer, core Affects Versions: 0.8.1, 0.8.1.1 Environment: Linux Reporter: Amir Malekpour Labels: newbie Attachments: KAFKA-1637.patch This concerns Kafka's Offset Fetch API: According to Kafka's current documentation, if there is no offset associated with a topic-partition under that consumer group the broker does not set an error code (since it is not really an error), but returns empty metadata and sets the offset field to -1. (Link below) However, in Kafka 08.1.1 Error code '3' is returned, which effectively makes it impossible for the client to decide if there was an error, or if there is no offset associated with a topic-partition under that consumer group. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1637) SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group
[ https://issues.apache.org/jira/browse/KAFKA-1637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14171621#comment-14171621 ] Ewen Cheslack-Postava commented on KAFKA-1637: -- The error code is for UnkownTopicOrPartition, which may have been correct if the request was for a non-existent topic or partition. Previously the code seemed to be doing the correct thing, reporting this error and returning invalid offset when the consumer hadn't started reading from that group. But KAFKA-1012 (a670537aa337) actually changed that behavior. The provided patch tries to cover the different possible scenarios (missing topic, invalid partition, and valid TopicAndPartition but a consumer with no offset for it). One potential caveat is auto topic creation since it could be reasonable to not return UnkownTopicOrPartition for a missing topic in that case. I'm not sure we really want different behavior in that case though. SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group Key: KAFKA-1637 URL: https://issues.apache.org/jira/browse/KAFKA-1637 Project: Kafka Issue Type: Bug Components: consumer, core Affects Versions: 0.8.1, 0.8.1.1 Environment: Linux Reporter: Amir Malekpour Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-1637.patch This concerns Kafka's Offset Fetch API: According to Kafka's current documentation, if there is no offset associated with a topic-partition under that consumer group the broker does not set an error code (since it is not really an error), but returns empty metadata and sets the offset field to -1. (Link below) However, in Kafka 08.1.1 Error code '3' is returned, which effectively makes it impossible for the client to decide if there was an error, or if there is no offset associated with a topic-partition under that consumer group. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26663: Patch for KAFKA-979
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26663/ --- (Updated Oct. 14, 2014, 10:33 p.m.) Review request for kafka. Bugs: KAFKA-979 https://issues.apache.org/jira/browse/KAFKA-979 Repository: kafka Description (updated) --- Add a new options log.roll.jitter.ms and log.roll.jitter.hours to add random jitter to time-based log rolling so logs aren't likely to roll at exactly the same time. Jitter always reduces the timeout so log.roll.ms still provides a soft maximum. Defaults to 0 so no jitter is added by default. Addressing warning and Util.abs comments. Diffs (updated) - core/src/main/scala/kafka/log/Log.scala a123cdc52f341a802b3e4bfeb29a6154332e5f73 core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff core/src/main/scala/kafka/log/LogConfig.scala d2cc9e3d6b7a4fd24516d164eb3673e6ce052129 core/src/main/scala/kafka/log/LogSegment.scala 7597d309f37a0b3756381f9500100ef763d466ba core/src/main/scala/kafka/server/KafkaConfig.scala 7fcbc16da898623b03659c803e2a20c7d1bd1011 core/src/main/scala/kafka/server/KafkaServer.scala 3e9e91f2b456bbdeb3055d571e18ffea8675b4bf core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 7b97e6a80753a770ac094e101c653193dec67e68 core/src/test/scala/unit/kafka/log/LogTest.scala a0cbd3bbbeeabae12caa6b41aec31a8f5dfd034b Diff: https://reviews.apache.org/r/26663/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Commented] (KAFKA-979) Add jitter for time based rolling
[ https://issues.apache.org/jira/browse/KAFKA-979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14171648#comment-14171648 ] Ewen Cheslack-Postava commented on KAFKA-979: - Updated reviewboard https://reviews.apache.org/r/26663/diff/ against branch origin/trunk Add jitter for time based rolling - Key: KAFKA-979 URL: https://issues.apache.org/jira/browse/KAFKA-979 Project: Kafka Issue Type: Bug Reporter: Sriram Subramanian Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-979.patch, KAFKA-979_2014-10-14_15:33:31.patch Currently, for low volume topics time based rolling happens at the same time. This causes a lot of IO on a typical cluster and creates back pressure. We need to add a jitter to prevent them from happening at the same time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-979) Add jitter for time based rolling
[ https://issues.apache.org/jira/browse/KAFKA-979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-979: Attachment: KAFKA-979_2014-10-14_15:33:31.patch Add jitter for time based rolling - Key: KAFKA-979 URL: https://issues.apache.org/jira/browse/KAFKA-979 Project: Kafka Issue Type: Bug Reporter: Sriram Subramanian Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-979.patch, KAFKA-979_2014-10-14_15:33:31.patch Currently, for low volume topics time based rolling happens at the same time. This causes a lot of IO on a typical cluster and creates back pressure. We need to add a jitter to prevent them from happening at the same time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1637) SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group
[ https://issues.apache.org/jira/browse/KAFKA-1637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172513#comment-14172513 ] Ewen Cheslack-Postava commented on KAFKA-1637: -- Updated reviewboard https://reviews.apache.org/r/26710/diff/ against branch origin/trunk SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group Key: KAFKA-1637 URL: https://issues.apache.org/jira/browse/KAFKA-1637 Project: Kafka Issue Type: Bug Components: consumer, core Affects Versions: 0.8.1, 0.8.1.1 Environment: Linux Reporter: Amir Malekpour Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-1637.patch, KAFKA-1637_2014-10-15_09:08:12.patch This concerns Kafka's Offset Fetch API: According to Kafka's current documentation, if there is no offset associated with a topic-partition under that consumer group the broker does not set an error code (since it is not really an error), but returns empty metadata and sets the offset field to -1. (Link below) However, in Kafka 08.1.1 Error code '3' is returned, which effectively makes it impossible for the client to decide if there was an error, or if there is no offset associated with a topic-partition under that consumer group. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26710: Patch for KAFKA-1637
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26710/ --- (Updated Oct. 15, 2014, 4:08 p.m.) Review request for kafka. Bugs: KAFKA-1637 https://issues.apache.org/jira/browse/KAFKA-1637 Repository: kafka Description (updated) --- Use MetadataCache instead of ReplicaManager to check for non-existent topics and partitions. Diffs (updated) - core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 1586243d20d6a181a1bd9f07e1c9493596005b32 core/src/main/scala/kafka/server/KafkaApis.scala 67f2833804cb15976680e42b9dc49e275c89d266 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 2d9325045ac1ac2d7531161b32c98c847125cbf0 Diff: https://reviews.apache.org/r/26710/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1637) SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group
[ https://issues.apache.org/jira/browse/KAFKA-1637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1637: - Attachment: KAFKA-1637_2014-10-15_09:08:12.patch SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group Key: KAFKA-1637 URL: https://issues.apache.org/jira/browse/KAFKA-1637 Project: Kafka Issue Type: Bug Components: consumer, core Affects Versions: 0.8.1, 0.8.1.1 Environment: Linux Reporter: Amir Malekpour Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-1637.patch, KAFKA-1637_2014-10-15_09:08:12.patch This concerns Kafka's Offset Fetch API: According to Kafka's current documentation, if there is no offset associated with a topic-partition under that consumer group the broker does not set an error code (since it is not really an error), but returns empty metadata and sets the offset field to -1. (Link below) However, in Kafka 08.1.1 Error code '3' is returned, which effectively makes it impossible for the client to decide if there was an error, or if there is no offset associated with a topic-partition under that consumer group. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26663: Patch for KAFKA-979
On Oct. 15, 2014, 4:19 a.m., Joel Koshy wrote: core/src/main/scala/kafka/log/Log.scala, line 515 https://reviews.apache.org/r/26663/diff/2/?file=721126#file721126line515 Thinking about this a bit more: do you think it would be safer to interpret jitter as an additive value to segmentMs? i.e., the actual age for rolling will be config.segmentMs + segment.rollJitterMs; (and limit segment.rollJitterMs to an interval of [0, config.segmentMs] which you are already doing.) Otherwise if a user happens to set a high jitter time then nearly empty segments roll often (with high probability). Another way to interpret it is as a jitter window. i.e., the actual age for rolling will be config.segmentMs + segment.rollJitterMs; and limit segment.rollJitterMs to an interval of [-config.segmentMs / 2, config.segmentMs / 2] Thoughts? I considered all of these options. The reason I went with the approach in the patch is that it preserves the meaning of segmentJitterMs which says it is a maximum, albeit soft, time. That said, since this needs to be explicitly enabled by setting the new parameter to be non-zero, I don't think it would be unreasonable to expect someone enabling it to understand the implications. I personally think the final option that causes the average time to be config.segmentMs is most intuitive, but as long as the effect is clearly documented they are all effectively equivalent assuming uniform sampling. - Ewen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26663/#review56652 --- On Oct. 14, 2014, 10:33 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26663/ --- (Updated Oct. 14, 2014, 10:33 p.m.) Review request for kafka. Bugs: KAFKA-979 https://issues.apache.org/jira/browse/KAFKA-979 Repository: kafka Description --- Add a new options log.roll.jitter.ms and log.roll.jitter.hours to add random jitter to time-based log rolling so logs aren't likely to roll at exactly the same time. Jitter always reduces the timeout so log.roll.ms still provides a soft maximum. Defaults to 0 so no jitter is added by default. Addressing warning and Util.abs comments. Diffs - core/src/main/scala/kafka/log/Log.scala a123cdc52f341a802b3e4bfeb29a6154332e5f73 core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff core/src/main/scala/kafka/log/LogConfig.scala d2cc9e3d6b7a4fd24516d164eb3673e6ce052129 core/src/main/scala/kafka/log/LogSegment.scala 7597d309f37a0b3756381f9500100ef763d466ba core/src/main/scala/kafka/server/KafkaConfig.scala 7fcbc16da898623b03659c803e2a20c7d1bd1011 core/src/main/scala/kafka/server/KafkaServer.scala 3e9e91f2b456bbdeb3055d571e18ffea8675b4bf core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 7b97e6a80753a770ac094e101c653193dec67e68 core/src/test/scala/unit/kafka/log/LogTest.scala a0cbd3bbbeeabae12caa6b41aec31a8f5dfd034b Diff: https://reviews.apache.org/r/26663/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Re: Review Request 26663: Patch for KAFKA-979
On Oct. 15, 2014, 4:19 a.m., Joel Koshy wrote: core/src/main/scala/kafka/log/Log.scala, line 515 https://reviews.apache.org/r/26663/diff/2/?file=721126#file721126line515 Thinking about this a bit more: do you think it would be safer to interpret jitter as an additive value to segmentMs? i.e., the actual age for rolling will be config.segmentMs + segment.rollJitterMs; (and limit segment.rollJitterMs to an interval of [0, config.segmentMs] which you are already doing.) Otherwise if a user happens to set a high jitter time then nearly empty segments roll often (with high probability). Another way to interpret it is as a jitter window. i.e., the actual age for rolling will be config.segmentMs + segment.rollJitterMs; and limit segment.rollJitterMs to an interval of [-config.segmentMs / 2, config.segmentMs / 2] Thoughts? Ewen Cheslack-Postava wrote: I considered all of these options. The reason I went with the approach in the patch is that it preserves the meaning of segmentJitterMs which says it is a maximum, albeit soft, time. That said, since this needs to be explicitly enabled by setting the new parameter to be non-zero, I don't think it would be unreasonable to expect someone enabling it to understand the implications. I personally think the final option that causes the average time to be config.segmentMs is most intuitive, but as long as the effect is clearly documented they are all effectively equivalent assuming uniform sampling. Neha Narkhede wrote: Thinking about this more in the context of how we initially found this problem. Users want to use time based rolling along side time based retention since it makes it easier to reason about the time window of data in a segment. This is mainly useful when resetting offset based on time since offsets are returned only on segment boundaries. From a user perspective, time based rolling is just supposed to work out of the box and not have performance implications in large clusters, which in fact, today it does. This is also very nuanced for most users to understand and work around and almost everyone would just expect Kafka to do the right thing. Essentially, I'm arguing this not be a configurable constant value but a value derived from performance tests done by us. Even if it has to be exposed through a config, it seems better for it to be a function of the segment roll time instead of a constant value. This way, people don't have to worry about it except in rare cases where it nee ds to be tuned and even then is difficult to screw up. It might make sense to run a stress test (possibly using some tweaked version of StressTestLog.scala). Joel Koshy wrote: I think Ewen's evaluation criteria is a useful one. i.e., what is the average age going to be. In the current patch, the age ranges from [segment.ms - randomJitter, segment.ms] where randomJitter ranges from [0, min(jitterMs, segment.ms)]. If jitterMs == segment.ms the average age will be segment.ms / 2. If the age ranges from [segment.ms, segment.ms + randomJitter] the average age will be segment.ms + segment.ms / 2. If the age ranges from [segment.ms - randomJitter / 2, segment.ms + randomJitter / 2) the average age will be segment.ms - which is the most intuitive. @Neha I actually think all this will be an interim solution until we get timestamps into the message metadata and log index. There is a thread discussing that. When that is done I think we can do with only size-based rolling and time-based retention can be done by using message header metadata. The concerns here are very much opposed -- Joel seems to be interested in good, intuitive control over the exact results of using jitter but Neha wants things to just work. I assume this issue only comes up when you have a large enough deployment that a lot of logs can roll at once, in which case you're probably tweaking a bunch of settings anyway. I'm also not sure we could come up with one good constant since the problem scales with the # of partitions. I think the best we could do is try to come up with a conservative maximum # of partitions/logs (per disk?) to support well without tweaking, then measure an average fsync time and choose a default based on that. Then again, for the just works case, the default roll time is 1 week, so even a small jitter (e.g. minutes) would have little impact on the timing and be more than enough jitter. I think the most useful input here would be from an ops person who could say what their ideal is (and whether they think a constant value would be able to reasonably solve the problem). - Ewen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26663/#review56652
Review Request 26770: Patch for KAFKA-1108
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26770/ --- Review request for kafka. Bugs: KAFKA-1108 https://issues.apache.org/jira/browse/KAFKA-1108 Repository: kafka Description --- KAFKA-1108 Log IOException messages during controlled shutdown. Diffs - core/src/main/scala/kafka/server/KafkaServer.scala 07c0a078ffa5142441f687da851472da732c3837 Diff: https://reviews.apache.org/r/26770/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1108) when controlled shutdown attempt fails, the reason is not always logged
[ https://issues.apache.org/jira/browse/KAFKA-1108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1108: - Attachment: KAFKA-1108.patch when controlled shutdown attempt fails, the reason is not always logged --- Key: KAFKA-1108 URL: https://issues.apache.org/jira/browse/KAFKA-1108 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1108.patch In KafkaServer.controlledShutdown(), it initiates a controlled shutdown, and then if there's a failure, it will retry the controlledShutdown. Looking at the code, there are 2 ways a retry could fail, one with an error response from the controller, and this messaging code: {code} info(Remaining partitions to move: %s.format(shutdownResponse.partitionsRemaining.mkString(,))) info(Error code from controller: %d.format(shutdownResponse.errorCode)) {code} Alternatively, there could be an IOException, with this code executed: {code} catch { case ioe: java.io.IOException = channel.disconnect() channel = null // ignore and try again } {code} And then finally, in either case: {code} if (!shutdownSuceeded) { Thread.sleep(config.controlledShutdownRetryBackoffMs) warn(Retrying controlled shutdown after the previous attempt failed...) } {code} It would be nice if the nature of the IOException were logged in either case (I'd be happy with an ioe.getMessage() instead of a full stack trace, as kafka in general tends to be too willing to dump IOException stack traces!). I suspect, in my case, the actual IOException is a socket timeout (as the time between initial Starting controlled shutdown and the first Retrying... message is usually about 35 seconds (the socket timeout + the controlled shutdown retry backoff). So, it would seem that really, the issue in this case is that controlled shutdown is taking too long. It would seem sensible instead to have the controller report back to the server (before the socket timeout) that more time is needed, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1108) when controlled shutdown attempt fails, the reason is not always logged
[ https://issues.apache.org/jira/browse/KAFKA-1108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1108: - Assignee: Ewen Cheslack-Postava Status: Patch Available (was: Open) when controlled shutdown attempt fails, the reason is not always logged --- Key: KAFKA-1108 URL: https://issues.apache.org/jira/browse/KAFKA-1108 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg Assignee: Ewen Cheslack-Postava Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1108.patch In KafkaServer.controlledShutdown(), it initiates a controlled shutdown, and then if there's a failure, it will retry the controlledShutdown. Looking at the code, there are 2 ways a retry could fail, one with an error response from the controller, and this messaging code: {code} info(Remaining partitions to move: %s.format(shutdownResponse.partitionsRemaining.mkString(,))) info(Error code from controller: %d.format(shutdownResponse.errorCode)) {code} Alternatively, there could be an IOException, with this code executed: {code} catch { case ioe: java.io.IOException = channel.disconnect() channel = null // ignore and try again } {code} And then finally, in either case: {code} if (!shutdownSuceeded) { Thread.sleep(config.controlledShutdownRetryBackoffMs) warn(Retrying controlled shutdown after the previous attempt failed...) } {code} It would be nice if the nature of the IOException were logged in either case (I'd be happy with an ioe.getMessage() instead of a full stack trace, as kafka in general tends to be too willing to dump IOException stack traces!). I suspect, in my case, the actual IOException is a socket timeout (as the time between initial Starting controlled shutdown and the first Retrying... message is usually about 35 seconds (the socket timeout + the controlled shutdown retry backoff). So, it would seem that really, the issue in this case is that controlled shutdown is taking too long. It would seem sensible instead to have the controller report back to the server (before the socket timeout) that more time is needed, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1108) when controlled shutdown attempt fails, the reason is not always logged
[ https://issues.apache.org/jira/browse/KAFKA-1108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14172775#comment-14172775 ] Ewen Cheslack-Postava commented on KAFKA-1108: -- Created reviewboard https://reviews.apache.org/r/26770/diff/ against branch origin/trunk when controlled shutdown attempt fails, the reason is not always logged --- Key: KAFKA-1108 URL: https://issues.apache.org/jira/browse/KAFKA-1108 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1108.patch In KafkaServer.controlledShutdown(), it initiates a controlled shutdown, and then if there's a failure, it will retry the controlledShutdown. Looking at the code, there are 2 ways a retry could fail, one with an error response from the controller, and this messaging code: {code} info(Remaining partitions to move: %s.format(shutdownResponse.partitionsRemaining.mkString(,))) info(Error code from controller: %d.format(shutdownResponse.errorCode)) {code} Alternatively, there could be an IOException, with this code executed: {code} catch { case ioe: java.io.IOException = channel.disconnect() channel = null // ignore and try again } {code} And then finally, in either case: {code} if (!shutdownSuceeded) { Thread.sleep(config.controlledShutdownRetryBackoffMs) warn(Retrying controlled shutdown after the previous attempt failed...) } {code} It would be nice if the nature of the IOException were logged in either case (I'd be happy with an ioe.getMessage() instead of a full stack trace, as kafka in general tends to be too willing to dump IOException stack traces!). I suspect, in my case, the actual IOException is a socket timeout (as the time between initial Starting controlled shutdown and the first Retrying... message is usually about 35 seconds (the socket timeout + the controlled shutdown retry backoff). So, it would seem that really, the issue in this case is that controlled shutdown is taking too long. It would seem sensible instead to have the controller report back to the server (before the socket timeout) that more time is needed, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26710: Patch for KAFKA-1637
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26710/ --- (Updated Oct. 15, 2014, 9:47 p.m.) Review request for kafka. Bugs: KAFKA-1637 https://issues.apache.org/jira/browse/KAFKA-1637 Repository: kafka Description (updated) --- Use MetadataCache instead of ReplicaManager to check for non-existent topics and partitions. Updating naming Diffs (updated) - core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 1586243d20d6a181a1bd9f07e1c9493596005b32 core/src/main/scala/kafka/server/KafkaApis.scala 67f2833804cb15976680e42b9dc49e275c89d266 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 2d9325045ac1ac2d7531161b32c98c847125cbf0 Diff: https://reviews.apache.org/r/26710/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Commented] (KAFKA-1637) SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group
[ https://issues.apache.org/jira/browse/KAFKA-1637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173018#comment-14173018 ] Ewen Cheslack-Postava commented on KAFKA-1637: -- Updated reviewboard https://reviews.apache.org/r/26710/diff/ against branch origin/trunk SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group Key: KAFKA-1637 URL: https://issues.apache.org/jira/browse/KAFKA-1637 Project: Kafka Issue Type: Bug Components: consumer, core Affects Versions: 0.8.1, 0.8.1.1 Environment: Linux Reporter: Amir Malekpour Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-1637.patch, KAFKA-1637_2014-10-15_09:08:12.patch, KAFKA-1637_2014-10-15_14:47:21.patch This concerns Kafka's Offset Fetch API: According to Kafka's current documentation, if there is no offset associated with a topic-partition under that consumer group the broker does not set an error code (since it is not really an error), but returns empty metadata and sets the offset field to -1. (Link below) However, in Kafka 08.1.1 Error code '3' is returned, which effectively makes it impossible for the client to decide if there was an error, or if there is no offset associated with a topic-partition under that consumer group. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1637) SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group
[ https://issues.apache.org/jira/browse/KAFKA-1637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1637: - Attachment: KAFKA-1637_2014-10-15_14:47:21.patch SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group Key: KAFKA-1637 URL: https://issues.apache.org/jira/browse/KAFKA-1637 Project: Kafka Issue Type: Bug Components: consumer, core Affects Versions: 0.8.1, 0.8.1.1 Environment: Linux Reporter: Amir Malekpour Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-1637.patch, KAFKA-1637_2014-10-15_09:08:12.patch, KAFKA-1637_2014-10-15_14:47:21.patch This concerns Kafka's Offset Fetch API: According to Kafka's current documentation, if there is no offset associated with a topic-partition under that consumer group the broker does not set an error code (since it is not really an error), but returns empty metadata and sets the offset field to -1. (Link below) However, in Kafka 08.1.1 Error code '3' is returned, which effectively makes it impossible for the client to decide if there was an error, or if there is no offset associated with a topic-partition under that consumer group. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 26811: Patch for KAFKA-1196
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26811/ --- Review request for kafka. Bugs: KAFKA-1196 https://issues.apache.org/jira/browse/KAFKA-1196 Repository: kafka Description --- KAFKA-1196 WIP Ensure FetchResponses don't exceed 2GB limit. Diffs - core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala a5386a03b62956bc440b40783247c8cdf7432315 Diff: https://reviews.apache.org/r/26811/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Commented] (KAFKA-1196) java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33
[ https://issues.apache.org/jira/browse/KAFKA-1196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173941#comment-14173941 ] Ewen Cheslack-Postava commented on KAFKA-1196: -- Created reviewboard https://reviews.apache.org/r/26811/diff/ against branch origin/trunk java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33 --- Key: KAFKA-1196 URL: https://issues.apache.org/jira/browse/KAFKA-1196 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.0 Environment: running java 1.7, linux and kafka compiled against scala 2.9.2 Reporter: Gerrit Jansen van Vuuren Priority: Blocker Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1196.patch I have 6 topics each with 8 partitions spread over 4 kafka servers. the servers are 24 core 72 gig ram. While consuming from the topics I get an IlegalArgumentException and all consumption stops, the error keeps on throwing. I've tracked it down to FectchResponse.scala line 33 The error happens when the FetchResponsePartitionData object's readFrom method calls: messageSetBuffer.limit(messageSetSize) I put in some debug code the the messageSetSize is 671758648, while the buffer.capacity() gives 155733313, for some reason the buffer is smaller than the required message size. I don't know the consumer code enough to debug this. It doesn't matter if compression is used or not. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1196) java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33
[ https://issues.apache.org/jira/browse/KAFKA-1196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1196: - Assignee: Ewen Cheslack-Postava Status: Patch Available (was: Open) java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33 --- Key: KAFKA-1196 URL: https://issues.apache.org/jira/browse/KAFKA-1196 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.0 Environment: running java 1.7, linux and kafka compiled against scala 2.9.2 Reporter: Gerrit Jansen van Vuuren Assignee: Ewen Cheslack-Postava Priority: Blocker Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1196.patch I have 6 topics each with 8 partitions spread over 4 kafka servers. the servers are 24 core 72 gig ram. While consuming from the topics I get an IlegalArgumentException and all consumption stops, the error keeps on throwing. I've tracked it down to FectchResponse.scala line 33 The error happens when the FetchResponsePartitionData object's readFrom method calls: messageSetBuffer.limit(messageSetSize) I put in some debug code the the messageSetSize is 671758648, while the buffer.capacity() gives 155733313, for some reason the buffer is smaller than the required message size. I don't know the consumer code enough to debug this. It doesn't matter if compression is used or not. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1196) java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33
[ https://issues.apache.org/jira/browse/KAFKA-1196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14173943#comment-14173943 ] Ewen Cheslack-Postava commented on KAFKA-1196: -- This is a wip patch to fix this issue, which previous discussion suggests was due to the FetchResponse exceeding 2GB. My approach to triggering the issue, however, doesn't exhibit exactly the same issue but does cause an unrecoverable error that causes the consumer connection to terminate. (For reference, it causes the server to fail when FetchResponseSend.writeTo calls expectIncomplete and sendSize is negative due to overflow. This confuses the server since it looks like the message is already done sending and the server forcibly closes the consumer's connection.) The patch addresses the core issue by ensuring the returned message doesn't exceed 2GB by dropping parts of it in a way that otherwise shouldn't affect the consumer. But there are a lot of points that still need to be addressed: * I started by building an integration test to trigger the issue, included in PrimitiveApiTest. However, since we necessarily need to have 2GB data to trigger the issue, it's probably too expensive to include in this way. Offline discussion suggests maybe a system test would be a better place to include this. It's still included here for completeness. * The implementation filters to a subset of the data in FetchResponse. The main reason for this is that this process needs to know the exact (or at least conservative estimate) size of serialized data, which only FetchResponse knows. But it's also a bit weird compared to other message classes, which are case classes and don't modify those inputs. * Algorithm for choosing subset to return: initial approach is to remove random elements until we get below the limit. This is simple to understand and avoids starvation of specific TopicAndPartitions. Any concerns with this basic approach? * I'm pretty sure I've managed to keep the 2GB case to effectively the same computational cost (computing the serialized size, grouped data, etc. exactly once as before). However, for the 2GB case I've only ensured correctness. In particular, the progressive removal and reevaluation of serialized size could potentially be very bad for very large data sets (e.g. starting a mirror maker against a large data set with large # of partitions from scratch). * Note that the algorithm never deals with the actual message data, only metadata about what messages are available. This is relevant since this is what suggested the approach in the patch could still be performant -- ReplicaManager.readMessageSets processes the entire FetchRequest and filters it down because the metadata involved is relatively small. * Based on the previous two points, this really needs some more realistic large scale system tests to make sure this approach is not only correct, but provides reasonable performance (or indicates we need to revise the algorithm for selecting a subset of the data). * Testing isn't really complete -- I triggered the issue with 4 topics * 600 MB/topic, which is 2GB. Another obvious case to check is when some partitions contain 2GB on their own. * I'd like someone to help sanity check the exact maximum FetchResponse serialized size we limit messages to. It's not Int.MaxValue because the FetchResponseSend class adds 4 + FetchResponse.sizeInBytes for it's own size. I'd like a sanity check that the extra 4 bytes is enough -- is there any additional wrapping we might need to account for? Getting a test to hit exactly that narrow range could be tricky. * The tests include both immediate-response and purgatory paths, but the purgatory version requires a timeout in the test, which could end up being flaky + wasting time, but it doesn't look like there's a great way to mock that right now. Maybe this doesn't matter if it moves to a system test? * One case this doesn't handle yet is when the data reaches 2GB after it's in the purgatory. The result is correct, but the response is not sent as soon as that condition is satisfied. This is because it looks like evaluating this exactly would require calling readMessageSets and evaluating the size of the message for every DelayedFetch.isSatisifed call. This sounds like it could end up being pretty expensive. Maybe there's a better way, perhaps an approximate scheme? * The test requires some extra bytes in the fetchSize for each partition, presumably for overhead in encoding. I haven't tracked down exactly how big that should be, but I'm guessing it could end up affecting the results of more comprehensive tests. java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33 --- Key: KAFKA-1196 URL: https://issues.apache.org/jira
[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174178#comment-14174178 ] Ewen Cheslack-Postava commented on KAFKA-1710: -- This looks like a red herring due to the structure of the test. The test code generates 200 threads which share 4 producers, and each thread round-robins through the consumers, then sleeps for 10ms. It looks like all that's happening is that the profiling tool sees the same stack trace repeatedly because there's a huge amount of contention for the 4 producers. If you take a look at the stack traces, they're almost all waiting on a lock on a queue that the messages get appended to. The few active threads have those queues locked and are working on compressing data before sending it out. Given the number of threads and the small number of producers, it's not surprising that YourKit sees the same stack traces for a long time -- the threads can be making forward progress, but any time the profiler stops to look at the stack traces, it's very likely that any given thread will be waiting on a lock with the same stack trace. None of the stack traces show any evidence of a real deadlock (i.e. I can't find any set of locks where there could be ordering issues since almost every thread is just waiting on a one lock in one of the producers). If this did hit deadlock, the process should stop entirely because all the worker threads use all 4 producers and the supposedly deadlocked threads are all waiting on locks in the producer. I ran the test to completion multiple times without any issues. Unless this has actually been observed to hit deadlock and stop making progress, I think this should be closed since these messages are really just warnings from YourKit. [~Bmis13] you might try reducing the # of threads and seeing if those charts end up looking better. I bet if you actually showed all the threads instead of just the couple in the screenshot, the areas marked as runnable across all threads would sum to a reasonable total. Also, there are other possible issues with getting good performance from this test code, e.g. the round robin approach can cause all threads to get blocked on the same producer if the producer gets locked for a relatively long time. This can happen when data is ready to be sent and is getting compressed. Other approaches to distributing work across the producers may provide better throughput. [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Priority: Critical Labels: performance Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, th6.dump, th7.dump, th8.dump, th9.dump Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237
[jira] [Commented] (KAFKA-1108) when controlled shutdown attempt fails, the reason is not always logged
[ https://issues.apache.org/jira/browse/KAFKA-1108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174222#comment-14174222 ] Ewen Cheslack-Postava commented on KAFKA-1108: -- Updated reviewboard https://reviews.apache.org/r/26770/diff/ against branch origin/trunk when controlled shutdown attempt fails, the reason is not always logged --- Key: KAFKA-1108 URL: https://issues.apache.org/jira/browse/KAFKA-1108 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg Assignee: Ewen Cheslack-Postava Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1108.patch, KAFKA-1108_2014-10-16_13:53:11.patch In KafkaServer.controlledShutdown(), it initiates a controlled shutdown, and then if there's a failure, it will retry the controlledShutdown. Looking at the code, there are 2 ways a retry could fail, one with an error response from the controller, and this messaging code: {code} info(Remaining partitions to move: %s.format(shutdownResponse.partitionsRemaining.mkString(,))) info(Error code from controller: %d.format(shutdownResponse.errorCode)) {code} Alternatively, there could be an IOException, with this code executed: {code} catch { case ioe: java.io.IOException = channel.disconnect() channel = null // ignore and try again } {code} And then finally, in either case: {code} if (!shutdownSuceeded) { Thread.sleep(config.controlledShutdownRetryBackoffMs) warn(Retrying controlled shutdown after the previous attempt failed...) } {code} It would be nice if the nature of the IOException were logged in either case (I'd be happy with an ioe.getMessage() instead of a full stack trace, as kafka in general tends to be too willing to dump IOException stack traces!). I suspect, in my case, the actual IOException is a socket timeout (as the time between initial Starting controlled shutdown and the first Retrying... message is usually about 35 seconds (the socket timeout + the controlled shutdown retry backoff). So, it would seem that really, the issue in this case is that controlled shutdown is taking too long. It would seem sensible instead to have the controller report back to the server (before the socket timeout) that more time is needed, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1108) when controlled shutdown attempt fails, the reason is not always logged
[ https://issues.apache.org/jira/browse/KAFKA-1108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1108: - Attachment: KAFKA-1108_2014-10-16_13:53:11.patch when controlled shutdown attempt fails, the reason is not always logged --- Key: KAFKA-1108 URL: https://issues.apache.org/jira/browse/KAFKA-1108 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg Assignee: Ewen Cheslack-Postava Labels: newbie Fix For: 0.9.0 Attachments: KAFKA-1108.patch, KAFKA-1108_2014-10-16_13:53:11.patch In KafkaServer.controlledShutdown(), it initiates a controlled shutdown, and then if there's a failure, it will retry the controlledShutdown. Looking at the code, there are 2 ways a retry could fail, one with an error response from the controller, and this messaging code: {code} info(Remaining partitions to move: %s.format(shutdownResponse.partitionsRemaining.mkString(,))) info(Error code from controller: %d.format(shutdownResponse.errorCode)) {code} Alternatively, there could be an IOException, with this code executed: {code} catch { case ioe: java.io.IOException = channel.disconnect() channel = null // ignore and try again } {code} And then finally, in either case: {code} if (!shutdownSuceeded) { Thread.sleep(config.controlledShutdownRetryBackoffMs) warn(Retrying controlled shutdown after the previous attempt failed...) } {code} It would be nice if the nature of the IOException were logged in either case (I'd be happy with an ioe.getMessage() instead of a full stack trace, as kafka in general tends to be too willing to dump IOException stack traces!). I suspect, in my case, the actual IOException is a socket timeout (as the time between initial Starting controlled shutdown and the first Retrying... message is usually about 35 seconds (the socket timeout + the controlled shutdown retry backoff). So, it would seem that really, the issue in this case is that controlled shutdown is taking too long. It would seem sensible instead to have the controller report back to the server (before the socket timeout) that more time is needed, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26770: Patch for KAFKA-1108
On Oct. 16, 2014, 5:55 p.m., Neha Narkhede wrote: core/src/main/scala/kafka/server/KafkaServer.scala, line 239 https://reviews.apache.org/r/26770/diff/1/?file=722474#file722474line239 Should this be WARN instead? ERROR wouldn't be ideal since this operation is retried later. Also wondering if this message actually gives much information about the reason of the failure? It might just print out IOException. I think the reason for failure that people might understand is what might cause the IOException. How about improving the error message by saying that the possible cause for this error could be that the leader movement operation on the controller took longer than than the configured socket.timeout.ms. This will encourage users to inspect if the socket.timeout.ms needs to be bumped up or inspect why the controller is taking long for moving the leaders away from this broker. The INFO level just matched similar messages a few lines above, although this is a more significant issue than those. Newest patch updates to WARN. Message is also more detailed, but ideally the IOException message also contains more than just the class name. - Ewen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26770/#review56956 --- On Oct. 16, 2014, 8:53 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26770/ --- (Updated Oct. 16, 2014, 8:53 p.m.) Review request for kafka. Bugs: KAFKA-1108 https://issues.apache.org/jira/browse/KAFKA-1108 Repository: kafka Description --- More informative message and increase log level to warn. Diffs - core/src/main/scala/kafka/server/KafkaServer.scala 07c0a078ffa5142441f687da851472da732c3837 Diff: https://reviews.apache.org/r/26770/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Re: Review Request 26666: Patch for KAFKA-1653
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/2/ --- (Updated Oct. 16, 2014, 9:54 p.m.) Review request for kafka. Bugs: KAFKA-1653 https://issues.apache.org/jira/browse/KAFKA-1653 Repository: kafka Description (updated) --- Generate error for duplicates in PreferredLeaderElectionCommand instead of just swallowing duplicates. Report which entries are duplicated for ReassignPartitionCommand since they may be difficult to find in large reassignments. Diffs (updated) - core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala c7918483c02040a7cc18d6e9edbd20a3025a3a55 core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 691d69a49a240f38883d2025afaec26fd61281b5 core/src/main/scala/kafka/admin/TopicCommand.scala 7672c5aab4fba8c23b1bb5cd4785c332d300a3fa core/src/main/scala/kafka/tools/StateChangeLogMerger.scala d298e7e81acc7427c6cf4796b445966267ca54eb Diff: https://reviews.apache.org/r/2/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1653) Duplicate broker ids allowed in replica assignment
[ https://issues.apache.org/jira/browse/KAFKA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1653: - Attachment: KAFKA-1653_2014-10-16_14:54:07.patch Duplicate broker ids allowed in replica assignment -- Key: KAFKA-1653 URL: https://issues.apache.org/jira/browse/KAFKA-1653 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-1653.patch, KAFKA-1653_2014-10-16_14:54:07.patch The reassign partitions command and the controller do not ensure that all replicas for a partition are on different brokers. For example, you could set 1,2,2 as the list of brokers for the replicas. kafka-topics.sh --describe --under-replicated will list these partitions as under-replicated, but I can't see a reason why the controller should allow this state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1653) Duplicate broker ids allowed in replica assignment
[ https://issues.apache.org/jira/browse/KAFKA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174319#comment-14174319 ] Ewen Cheslack-Postava commented on KAFKA-1653: -- Updated reviewboard https://reviews.apache.org/r/2/diff/ against branch origin/trunk Duplicate broker ids allowed in replica assignment -- Key: KAFKA-1653 URL: https://issues.apache.org/jira/browse/KAFKA-1653 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-1653.patch, KAFKA-1653_2014-10-16_14:54:07.patch The reassign partitions command and the controller do not ensure that all replicas for a partition are on different brokers. For example, you could set 1,2,2 as the list of brokers for the replicas. kafka-topics.sh --describe --under-replicated will list these partitions as under-replicated, but I can't see a reason why the controller should allow this state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26666: Patch for KAFKA-1653
On Oct. 16, 2014, 6:10 p.m., Neha Narkhede wrote: Since you fixed some other tools as well, can we also fix the preferred replica election command where we can de-dup the partitions? This was already removing duplicates, I had it generate an exception instead since duplicates may indicate a config error. I'm assuming that's what you meant here. - Ewen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/2/#review56958 --- On Oct. 16, 2014, 9:54 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/2/ --- (Updated Oct. 16, 2014, 9:54 p.m.) Review request for kafka. Bugs: KAFKA-1653 https://issues.apache.org/jira/browse/KAFKA-1653 Repository: kafka Description --- Generate error for duplicates in PreferredLeaderElectionCommand instead of just swallowing duplicates. Report which entries are duplicated for ReassignPartitionCommand since they may be difficult to find in large reassignments. Diffs - core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala c7918483c02040a7cc18d6e9edbd20a3025a3a55 core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 691d69a49a240f38883d2025afaec26fd61281b5 core/src/main/scala/kafka/admin/TopicCommand.scala 7672c5aab4fba8c23b1bb5cd4785c332d300a3fa core/src/main/scala/kafka/tools/StateChangeLogMerger.scala d298e7e81acc7427c6cf4796b445966267ca54eb Diff: https://reviews.apache.org/r/2/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174373#comment-14174373 ] Ewen Cheslack-Postava commented on KAFKA-1710: -- [~Bmis13] That approach just pushes the problem into KafkaAsyncProducer's thread that processes messages -- there won't be lock contention in KafkaProducer since KafkaAsyncProducer will be the only user of it, but you may not get an improvement in throughput because ultimately you're limited to the time a single thread can get. It may even get *slower* because you'll have more runnable threads at any given time, which means that the KafkaAsyncProducer worker thread will get less CPU time. Even disregarding that, since you used a LinkedBlockingQueue that will become your new source of contention (since it must be synchronized internally). If you have a very large capacity, that'll let the threads continue to make progress and contention will be lower since the time spent adding an item is very small, but it will cost a lot of memory since you're just adding a layer of buffering. That might be useful if you have bursty traffic (the buffer allows you to temporarily buffer more data while the KafkaProducer works on getting it sent), but if you have sustained traffic you'll just have constantly growing memory usage. If the capacity is small, then the threads producing messages will eventually end up getting blocked waiting for there to be space in the queue. Probably the biggest issue here is that this test only writes to a single partition in a single topic. You could improve performance by using more partitions in that topic. You're already writing to all producers from all threads, so you must not need the ordering guarantees of a single partition. If you still want a single partition, you can improve performance by using more Producers, which will spread the contention across more queues. Since you already have 4 that you're running round-robin on, I'd guess adding more shouldn't be a problem. In any case, this use case seems a bit odd. Are you really going to have 200 threads generating messages *as fast as they can* with only 4 producers? As far as this issue is concerned, the original report said the problem was deadlock but that doesn't seem to be the case. If you're just worried about performance, it probably makes more sense to move the discussion over to the mailing list. It'll probably be seen by more people and there will probably be multiple suggestions for improvements to your approach before we have to make changes to the Kafka code. [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Priority: Critical Labels: performance Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, th6.dump, th7.dump, th8.dump, th9.dump Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType
[jira] [Updated] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1642: - Assignee: Ewen Cheslack-Postava (was: Jun Rao) Status: Patch Available (was: Open) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1642.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14175360#comment-14175360 ] Ewen Cheslack-Postava commented on KAFKA-1642: -- Created reviewboard https://reviews.apache.org/r/26885/diff/ against branch origin/trunk [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Jun Rao Attachments: KAFKA-1642.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 26885: Patch for KAFKA-1642
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/ --- Review request for kafka. Bugs: KAFKA-1642 https://issues.apache.org/jira/browse/KAFKA-1642 Repository: kafka Description --- Fixes two issues with computation of poll timeouts in Sender/RecordAccumulator. First, the timeout was being computed by RecordAccumulator as it looked up which nodes had data to send, but the timeout cannot be computed until after nodes that aren't ready for sending are filtered since this could result in a node that is currently unreachable always returning a timeout of 0 and triggering a busy loop. The fixed version computes per-node timeouts and only computes the final timeout after nodes that aren't ready for sending are removed. Second, timeouts were only being computed based on the first TopicAndPartition encountered for each node. This could result in incorrect timeouts if the first encountered didn't have the minimum timeout for that node. This now evaluates every TopicAndPartition with a known leader and takes the minimum. Diffs - clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c5d470011d334318d5ee801021aadd0c000974a6 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 0762b35abba0551f23047348c5893bb8c9acff14 Diff: https://reviews.apache.org/r/26885/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1642: - Attachment: KAFKA-1642.patch [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Jun Rao Attachments: KAFKA-1642.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14175430#comment-14175430 ] Ewen Cheslack-Postava commented on KAFKA-1710: -- bq. The dead lock will occur something depending on Thread scheduling and how log the are blocked. Dead lock has a specific definition -- two or more threads that are both waiting on each other such that neither can make any forward progress -- and as far as I can tell this isn't triggering a deadlock. From what I've seen this is simply an issue of trying of anywhere from 50 - 200 threads trying to access a shared, synchronized resource. This is just contention, everything continues to make progress. The test program runs to completion just fine. As for performance, I have no doubt there are improvements to be made in the Producer implementation, but you'll get a far bigger performance boost with careful design in your system. I already mentioned multiple ways you can improve performance that, based on your current test code, shouldn't affect anything else. Here's a quick example (using a lightly modified version of your code against a local test cluster): {quote} Existing setup (4 producers, 1 partition): All Producers done...! All done...! real1m50.135s user1m45.019s sys 1m53.219s {quote} {quote} 8 Producers, 1 partition (and parameters adjusted to generate same # of msgs): All Producers done...! All done...! real0m55.465s user1m27.132s sys 1m1.144s {quote} Nothing surprising, but since you haven't specified a constraint on the # of producers this seems like the simplest solution to improve performance. [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Priority: Critical Labels: performance Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, th6.dump, th7.dump, th8.dump, th9.dump Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker
[jira] [Resolved] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-1710. -- Resolution: Invalid Assignee: Ewen Cheslack-Postava [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1710 URL: https://issues.apache.org/jira/browse/KAFKA-1710 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Critical Labels: performance Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, th6.dump, th7.dump, th8.dump, th9.dump Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: blog with some out of the box pains
The first issue he runs into is one I also find frustrating -- with cloud providers pushing SSDs, you have to use a pretty large instance type to get a reasonable test setup. I'm not sure if he couldn't launch an older type like m1.large (I think some newer AWS accounts aren't able to) or if he just didn't see it as an option since they are hidden by default. Even the largest general purpose instance types are pretty wimpy wrt storage, only 80GB local instance storage. The hostname issues are a well known pain point and unfortunately there aren't any great solutions that aren't EC2-specific. Here's a quick run down: * None of the images for popular distros on EC2 will auto-set the hostname beyond what EC2 already sets up (which isn't publicly routable). The following details might explain why they can't. For example, a recent Ubuntu image gives: ubuntu@ip-172-30-2-76:~$ hostname ip-172-30-2-76 ubuntu@ip-172-30-2-76:~$ cat /etc/hosts 127.0.0.1 localhost # The following lines are desirable for IPv6 capable hosts ::1 ip6-localhost ip6-loopback --- cut irrelevant pieces --- * Sometimes the hostname is set, but isn't useful. For example, in this Ubuntu image, the hostname is set to ip-[ip-address-], but that isn't routable, so generates really irritating behavior. Running on the server itself (which is running in a VPC, see below for more details): scala InetAddress.getLocalHost java.net.UnknownHostException: ip-172-30-2-76: ip-172-30-2-76: Name or service not known at java.net.InetAddress.getLocalHost(InetAddress.java:1473) at .init(console:9) at .clinit(console) at .init(console:11) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:704) at scala.tools.nsc.interpreter.IMain$Request$$anonfun$14.apply(IMain.scala:920) at scala.tools.nsc.interpreter.Line$$anonfun$1.apply$mcV$sp(Line.scala:43) at scala.tools.nsc.io.package$$anon$2.run(package.scala:25) at java.lang.Thread.run(Thread.java:745) Caused by: java.net.UnknownHostException: ip-172-30-2-76: Name or service not known at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:901) at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1293) at java.net.InetAddress.getLocalHost(InetAddress.java:1469) ... 14 more * As described in a bunch of places, the only reliable way to get public DNS info is through EC2's own instance metadata API: https://forums.aws.amazon.com/thread.jspa?threadID=77788 For example: curl -s http://169.254.169.254/latest/meta-data/public-hostname might give something like: ec2-203-0-113-25.compute-1.amazonaws.com * But you may not even *have* a public DNS hostname. If you launch in a VPC, you'll only get one if you set the VPC to generate them (and I'm pretty sure the default is to not create them): http://docs.aws.amazon.com/AmazonVPC/latest/UserGuide/vpc-dns.html The output of the curl call above will just be empty. * AWS is pretty aggressively trying to move away from EC2-Classic (i.e. non-VPC instances), so most new instances will end up in VPCs unless you are working in a grandfathered account + AZ. If VPC without public DNS is the default, we'll have to carefully guide new users in generating a setup that works properly if we try to use hostnames. * Even if you try moving the IP addresses, you still have to deal with VPCs. You can't directly get your public IP address without accessing something outside the host since you're in a VPC. You need to use the instance metadata API to look it up, i.e., curl -s http://169.254.169.254/latest/meta-data/public-ipv4 * And yet another problem with IPs: unless you use an elastic IP, you're not guaranteed they'll be stable: Auto-assign Public IP Requests a public IP address from Amazon's public IP address pool, to make your instance reachable from the Internet. In most cases, the public IP address is associated with the instance until it’s stopped or terminated, after which it’s no longer available for you to use. If you require a persistent public IP address that you can associate and disassociate at will, use an Elastic IP address (EIP) instead. You can allocate your own EIP, and associate it to your instance after launch. I know Spark had some similar issues -- using their (very convenient!) ec2 script, you still ended up with some stuff
Re: Review Request 26885: Patch for KAFKA-1642
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/ --- (Updated Oct. 21, 2014, 12:34 a.m.) Review request for kafka. Bugs: KAFKA-1642 https://issues.apache.org/jira/browse/KAFKA-1642 Repository: kafka Description (updated) --- Fixes two issues with the computation of ready nodes and poll timeouts in Sender/RecordAccumulator: 1. The timeout was computed incorrectly because it took into account all nodes, even if they had data to send such that their timeout would be 0. However, nodes were then filtered based on whether it was possible to send (i.e. their connection was still good) which could result in nothing to send and a 0 timeout, resulting in busy looping. Instead, the timeout needs to be computed only using data that cannot be immediately sent, i.e. where the timeout will be greater than 0. This timeout is only used if, after filtering by whether connections are ready for sending, there is no data to be sent. Other events can wake the thread up earlier, e.g. a client reconnects and becomes ready again. 2. One of the conditions indicating whether data is sendable is whether a timeout has expired -- either the linger time or the retry backoff. This condition wasn't accounting for both cases properly, always using the linger time. This means the retry backoff was probably not being respected. KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but none can send data because they are in a connection backoff period. Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java d304660f29246e9600efe3ddb28cfcc2b074bed3 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 29658d4a15f112dc0af5ce517eaab93e6f00134b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java eea270abb16f40c9f3b47c4ea96be412fb4fdc8b clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c5d470011d334318d5ee801021aadd0c000974a6 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 clients/src/test/java/org/apache/kafka/clients/MockClient.java aae8d4a1e98279470587d397cc779a9baf6fee6c clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 0762b35abba0551f23047348c5893bb8c9acff14 Diff: https://reviews.apache.org/r/26885/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1642: - Attachment: KAFKA-1642_2014-10-20_17:33:57.patch [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177723#comment-14177723 ] Ewen Cheslack-Postava commented on KAFKA-1642: -- To summarize the issues fixed now: * Fix logic issue with expired in RecordAccumulator.ready * Don't include nodes that can send data when computing the delay until the next check for ready data. Including these doesn't make sense since their delays will change when we send data. * To correctly account for nodes with sendable data, use a timeout of 0 if we send any. This guarantees any necessary delay is computed immediately in the next round after some current data has been removed. * Properly account for nodes with sendable data under connection retry backoff. Since they weren't included in computing the next check delay when looking up ready nodes, we need to account for it later, but only if we conclude the node isn't ready. We need to incorporate the amount of backoff time still required before a retry will be performed (nothing else would wakeup at the right time, unlike other conditions like a full buffer which only change if data is received). It might be possible to break this into smaller commits for each one, but the ordering of applying them needs to be careful because some by themselves result in bad behavior -- the existing client worked because it often ended up with poll timeouts that were much more aggressive (i.e., often 0). [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [Java New Producer] Snappy NPE Issue
I took a quick look at this since I noticed the same issue when testing your code for the issues you filed. I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. -Ewen On Mon, Oct 20, 2014, at 12:19 PM, Bhavesh Mistry wrote: Hi Kafka Dev, I am getting following issue with Snappy Library. I checked code for Snappy lib it seems to be fine. Have you guys seen this issue ? 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Here is code for Snappy http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153 : 153 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153 *if* (inputBuffer http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer == *null* || (buffer != *null* buffer.length inputBuffer http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer.length http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer)) { Thanks, Bhavesh
[jira] [Created] (KAFKA-1721) Snappy compressor is not thread safe
Ewen Cheslack-Postava created KAFKA-1721: Summary: Snappy compressor is not thread safe Key: KAFKA-1721 URL: https://issues.apache.org/jira/browse/KAFKA-1721 Project: Kafka Issue Type: Bug Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava From the mailing list, it can generate this exception: 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) This appears to be an issue with the snappy-java library using ThreadLocal for an internal buffer recycling object which results in that object being shared unsafely across threads if one thread sends to multiple producers: {quote} I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [Java New Producer] Snappy NPE Issue
Also, filed https://issues.apache.org/jira/browse/KAFKA-1721 for this since it either requires an updated version of the upstream library, a workaround by us, or at a bare minimum clear documentation of the issue. On Mon, Oct 20, 2014, at 06:23 PM, Ewen Cheslack-Postava wrote: I took a quick look at this since I noticed the same issue when testing your code for the issues you filed. I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. -Ewen On Mon, Oct 20, 2014, at 12:19 PM, Bhavesh Mistry wrote: Hi Kafka Dev, I am getting following issue with Snappy Library. I checked code for Snappy lib it seems to be fine. Have you guys seen this issue ? 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Here is code for Snappy http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153 : 153 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153 *if* (inputBuffer http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer == *null* || (buffer != *null* buffer.length inputBuffer http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer.length http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer)) { Thanks, Bhavesh
Re: Review Request 26666: Patch for KAFKA-1653
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/2/ --- (Updated Oct. 21, 2014, 6:58 p.m.) Review request for kafka. Bugs: KAFKA-1653 https://issues.apache.org/jira/browse/KAFKA-1653 Repository: kafka Description (updated) --- Generate error for duplicates in PreferredLeaderElectionCommand instead of just swallowing duplicates. Report which entries are duplicated for ReassignPartitionCommand since they may be difficult to find in large reassignments. Report duplicate topics and duplicate topic partitions in ReassignPartitionsCommand. Make all duplication error messagse include details about what item was duplicated. Diffs (updated) - core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala c7918483c02040a7cc18d6e9edbd20a3025a3a55 core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 691d69a49a240f38883d2025afaec26fd61281b5 core/src/main/scala/kafka/admin/TopicCommand.scala 7672c5aab4fba8c23b1bb5cd4785c332d300a3fa core/src/main/scala/kafka/tools/StateChangeLogMerger.scala d298e7e81acc7427c6cf4796b445966267ca54eb core/src/main/scala/kafka/utils/Utils.scala 29d5a17d4a03cfd3f3cdd2994cbd783a4be2732e core/src/main/scala/kafka/utils/ZkUtils.scala a7b1fdcb50d5cf930352d37e39cb4fc9a080cb12 Diff: https://reviews.apache.org/r/2/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Commented] (KAFKA-1653) Duplicate broker ids allowed in replica assignment
[ https://issues.apache.org/jira/browse/KAFKA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178845#comment-14178845 ] Ewen Cheslack-Postava commented on KAFKA-1653: -- Updated reviewboard https://reviews.apache.org/r/2/diff/ against branch origin/trunk Duplicate broker ids allowed in replica assignment -- Key: KAFKA-1653 URL: https://issues.apache.org/jira/browse/KAFKA-1653 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-1653.patch, KAFKA-1653_2014-10-16_14:54:07.patch, KAFKA-1653_2014-10-21_11:57:50.patch The reassign partitions command and the controller do not ensure that all replicas for a partition are on different brokers. For example, you could set 1,2,2 as the list of brokers for the replicas. kafka-topics.sh --describe --under-replicated will list these partitions as under-replicated, but I can't see a reason why the controller should allow this state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1653) Duplicate broker ids allowed in replica assignment
[ https://issues.apache.org/jira/browse/KAFKA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1653: - Attachment: KAFKA-1653_2014-10-21_11:57:50.patch Duplicate broker ids allowed in replica assignment -- Key: KAFKA-1653 URL: https://issues.apache.org/jira/browse/KAFKA-1653 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-1653.patch, KAFKA-1653_2014-10-16_14:54:07.patch, KAFKA-1653_2014-10-21_11:57:50.patch The reassign partitions command and the controller do not ensure that all replicas for a partition are on different brokers. For example, you could set 1,2,2 as the list of brokers for the replicas. kafka-topics.sh --describe --under-replicated will list these partitions as under-replicated, but I can't see a reason why the controller should allow this state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1725) Configuration file bugs in system tests add noise to output and break a few tests
Ewen Cheslack-Postava created KAFKA-1725: Summary: Configuration file bugs in system tests add noise to output and break a few tests Key: KAFKA-1725 URL: https://issues.apache.org/jira/browse/KAFKA-1725 Project: Kafka Issue Type: Bug Components: tools Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Priority: Minor There are some broken and misnamed system test configuration files (testcase_*_properties.json) that are causing a bunch of exceptions when running system tests and make it a lot harder to parse the output. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 27060: Patch for KAFKA-1725
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27060/ --- Review request for kafka. Bugs: KAFKA-1725 https://issues.apache.org/jira/browse/KAFKA-1725 Repository: kafka Description --- KAFKA-1725: Clean up system test output: fix typo in system test case file, incorrectly named system test configuration files, and skip trying to generate metrics graphs when no data is available. Diffs - system_test/mirror_maker_testsuite/testcase_15001/testcase_5001_properties.json system_test/mirror_maker_testsuite/testcase_15002/testcase_5002_properties.json system_test/mirror_maker_testsuite/testcase_15003/testcase_5003_properties.json system_test/mirror_maker_testsuite/testcase_15004/testcase_5004_properties.json system_test/mirror_maker_testsuite/testcase_15005/testcase_5005_properties.json system_test/mirror_maker_testsuite/testcase_15006/testcase_5006_properties.json system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json 308f1937bbdc0fdcebdb8e9bc39e643c3f0c18be system_test/replication_testsuite/testcase_10101/testcase_0101_properties.json system_test/replication_testsuite/testcase_10102/testcase_0102_properties.json system_test/replication_testsuite/testcase_10103/testcase_0103_properties.json system_test/replication_testsuite/testcase_10104/testcase_0104_properties.json system_test/replication_testsuite/testcase_10105/testcase_0105_properties.json system_test/replication_testsuite/testcase_10106/testcase_0106_properties.json system_test/replication_testsuite/testcase_10107/testcase_0107_properties.json system_test/replication_testsuite/testcase_10108/testcase_0108_properties.json system_test/replication_testsuite/testcase_10109/testcase_0109_properties.json system_test/replication_testsuite/testcase_10110/testcase_0110_properties.json system_test/replication_testsuite/testcase_10131/testcase_0131_properties.json system_test/replication_testsuite/testcase_10132/testcase_0132_properties.json system_test/replication_testsuite/testcase_10133/testcase_0133_properties.json system_test/replication_testsuite/testcase_10134/testcase_0134_properties.json system_test/utils/metrics.py d98d3cdeab00be9ddf4b7032a68da3886e4850c7 Diff: https://reviews.apache.org/r/27060/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1725) Configuration file bugs in system tests add noise to output and break a few tests
[ https://issues.apache.org/jira/browse/KAFKA-1725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1725: - Attachment: KAFKA-1725.patch Configuration file bugs in system tests add noise to output and break a few tests - Key: KAFKA-1725 URL: https://issues.apache.org/jira/browse/KAFKA-1725 Project: Kafka Issue Type: Bug Components: tools Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: KAFKA-1725.patch There are some broken and misnamed system test configuration files (testcase_*_properties.json) that are causing a bunch of exceptions when running system tests and make it a lot harder to parse the output. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1725) Configuration file bugs in system tests add noise to output and break a few tests
[ https://issues.apache.org/jira/browse/KAFKA-1725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1725: - Status: Patch Available (was: Open) Configuration file bugs in system tests add noise to output and break a few tests - Key: KAFKA-1725 URL: https://issues.apache.org/jira/browse/KAFKA-1725 Project: Kafka Issue Type: Bug Components: tools Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: KAFKA-1725.patch There are some broken and misnamed system test configuration files (testcase_*_properties.json) that are causing a bunch of exceptions when running system tests and make it a lot harder to parse the output. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1725) Configuration file bugs in system tests add noise to output and break a few tests
[ https://issues.apache.org/jira/browse/KAFKA-1725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14180753#comment-14180753 ] Ewen Cheslack-Postava commented on KAFKA-1725: -- Created reviewboard https://reviews.apache.org/r/27060/diff/ against branch origin/trunk Configuration file bugs in system tests add noise to output and break a few tests - Key: KAFKA-1725 URL: https://issues.apache.org/jira/browse/KAFKA-1725 Project: Kafka Issue Type: Bug Components: tools Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Priority: Minor Attachments: KAFKA-1725.patch There are some broken and misnamed system test configuration files (testcase_*_properties.json) that are causing a bunch of exceptions when running system tests and make it a lot harder to parse the output. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [VOTE] 0.8.2-beta Release Candidate 1
Yeah, the version on Precise is really old, 1.0, and on Trusty is 1.4. Using the PPA version worked fine for me on Trusty. -Ewen On Thu, Oct 23, 2014, at 10:02 AM, Joe Stein wrote: hmmm. I just launched a bootstrap clean room (per release docs). Installed latest gradle sudo add-apt-repository ppa:cwchien/gradle sudo apt-get update sudo apt-get install gradle wget https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/kafka-0.8.2-beta-src.tgz tar -xvf kafka-0.8.2-beta-src.tgz cd kafka-0.8.2-beta-src/ gradle ... Building project 'core' with Scala version 2.10.1 :downloadWrapper BUILD SUCCESSFUL Total time: 22.066 secs so it works, I use the source upload to build the binaries. odd. * Maybe the gradle version you have is too old or something? I am using vagrant@precise64:~/kafka-0.8.2-beta-src$ gradle -version Gradle 2.1 Build time: 2014-09-08 10:40:39 UTC Build number: none Revision: e6cf70745ac11fa943e19294d19a2c527a669a53 Groovy: 2.3.6 Ant: Apache Ant(TM) version 1.9.3 compiled on December 23 2013 JVM: 1.6.0_45 (Sun Microsystems Inc. 20.45-b01) OS: Linux 3.2.0-23-generic amd64 /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Thu, Oct 23, 2014 at 12:49 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Joe, I downloaded the source distro and ran into this error. Followed the steps listed on the release process wiki to validate the release. vagrant@precise64:~/kafka-0.8.2-beta-src$ gradle FAILURE: Build failed with an exception. * Where: Script '/home/vagrant/kafka-0.8.2-beta-src/scala.gradle' line: 2 * What went wrong: A problem occurred evaluating script. Cause: Could not find property 'ext' on settings 'kafka-0.8.2-beta-src'. * Try: Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. BUILD FAILED Total time: 1.565 secs vagrant@precise64:~/kafka-0.8.2-beta-src$ cat scala.gradle if (!hasProperty('scalaVersion')) { ext.scalaVersion = '2.10.1' } ext.defaultScalaVersion = '2.10.1' if (scalaVersion.startsWith('2.10')) { ext.baseScalaVersion = '2.10' } else if (scalaVersion.startsWith('2.11')) { ext.baseScalaVersion = '2.11' } else { ext.baseScalaVersion = scalaVersion } On Tue, Oct 21, 2014 at 1:58 PM, Joe Stein joe.st...@stealth.ly wrote: This is the first candidate for release of Apache Kafka 0.8.2-beta Release Notes for the 0.8.2-beta release https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/RELEASE_NOTES.html *** Please download, test and vote by Friday, October 24th, 2pm PT Kafka's KEYS file containing PGP keys we use to sign the release: https://svn.apache.org/repos/asf/kafka/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary): https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/ * Maven artifacts to be voted upon prior to release: https://repository.apache.org/content/groups/staging/ * scala-doc https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/scala-doc/ * java-doc https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/java-doc/ * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2-beta tag https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=2b2c3da2c52bc62a89d60f85125d3723c8410fa0 /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop /
[jira] [Commented] (KAFKA-1471) Add Producer Unit Tests for LZ4 and LZ4HC compression
[ https://issues.apache.org/jira/browse/KAFKA-1471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182126#comment-14182126 ] Ewen Cheslack-Postava commented on KAFKA-1471: -- This was already applied, just never marked resolved. Add Producer Unit Tests for LZ4 and LZ4HC compression - Key: KAFKA-1471 URL: https://issues.apache.org/jira/browse/KAFKA-1471 Project: Kafka Issue Type: Sub-task Reporter: James Oliver Assignee: Ewen Cheslack-Postava Fix For: 0.8.2, 0.8.3 Attachments: KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471_2014-10-12_16:02:19.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1471) Add Producer Unit Tests for LZ4 and LZ4HC compression
[ https://issues.apache.org/jira/browse/KAFKA-1471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1471: - Resolution: Fixed Fix Version/s: 0.8.2 Status: Resolved (was: Patch Available) Add Producer Unit Tests for LZ4 and LZ4HC compression - Key: KAFKA-1471 URL: https://issues.apache.org/jira/browse/KAFKA-1471 Project: Kafka Issue Type: Sub-task Reporter: James Oliver Assignee: Ewen Cheslack-Postava Fix For: 0.8.2, 0.8.3 Attachments: KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471.patch, KAFKA-1471_2014-10-12_16:02:19.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1721) Snappy compressor is not thread safe
[ https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182130#comment-14182130 ] Ewen Cheslack-Postava commented on KAFKA-1721: -- I have the trivial patch, but the upstream jar seems to be broken (see the earlier Github issue). I'll follow up on this once that issue is resolved. Snappy compressor is not thread safe Key: KAFKA-1721 URL: https://issues.apache.org/jira/browse/KAFKA-1721 Project: Kafka Issue Type: Bug Components: compression Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava From the mailing list, it can generate this exception: 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) This appears to be an issue with the snappy-java library using ThreadLocal for an internal buffer recycling object which results in that object being shared unsafely across threads if one thread sends to multiple producers: {quote} I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26885: Patch for KAFKA-1642
On Oct. 23, 2014, 9:43 p.m., Jun Rao wrote: clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java, lines 197-199 https://reviews.apache.org/r/26885/diff/2/?file=726776#file726776line197 It seems that in this case, the nextReadyCheckDelayMs should be the remaining linger time for tp1, which is lingerMs/2. Should we just assert that? tp1 and tp2 have the same leader, node1. The test adds enough data to make tp2 sendable, so in the ideal case only tp3 would be used to determine timeout, which should require lingerMs more time. However, the test checks for = lingerMs because single scan through the topic partitions means that we can incorporate the lingerMs/2 timeout from tp1 even though we determine later that we really want to ignore it (and I actually saw this happen when I initially wrote the test to check for the exact value). I think the tradeoff of sometimes waking up a bit earlier than needed is probably worthwhile since it keeps the implementation simpler and cheaper. - Ewen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/#review57513 --- On Oct. 21, 2014, 12:34 a.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/ --- (Updated Oct. 21, 2014, 12:34 a.m.) Review request for kafka. Bugs: KAFKA-1642 https://issues.apache.org/jira/browse/KAFKA-1642 Repository: kafka Description --- Fixes two issues with the computation of ready nodes and poll timeouts in Sender/RecordAccumulator: 1. The timeout was computed incorrectly because it took into account all nodes, even if they had data to send such that their timeout would be 0. However, nodes were then filtered based on whether it was possible to send (i.e. their connection was still good) which could result in nothing to send and a 0 timeout, resulting in busy looping. Instead, the timeout needs to be computed only using data that cannot be immediately sent, i.e. where the timeout will be greater than 0. This timeout is only used if, after filtering by whether connections are ready for sending, there is no data to be sent. Other events can wake the thread up earlier, e.g. a client reconnects and becomes ready again. 2. One of the conditions indicating whether data is sendable is whether a timeout has expired -- either the linger time or the retry backoff. This condition wasn't accounting for both cases properly, always using the linger time. This means the retry backoff was probably not being respected. KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but none can send data because they are in a connection backoff period. Diffs - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java d304660f29246e9600efe3ddb28cfcc2b074bed3 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 29658d4a15f112dc0af5ce517eaab93e6f00134b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java eea270abb16f40c9f3b47c4ea96be412fb4fdc8b clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c5d470011d334318d5ee801021aadd0c000974a6 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 clients/src/test/java/org/apache/kafka/clients/MockClient.java aae8d4a1e98279470587d397cc779a9baf6fee6c clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 0762b35abba0551f23047348c5893bb8c9acff14 Diff: https://reviews.apache.org/r/26885/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Re: Review Request 26885: Patch for KAFKA-1642
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/ --- (Updated Oct. 23, 2014, 11:19 p.m.) Review request for kafka. Bugs: KAFKA-1642 https://issues.apache.org/jira/browse/KAFKA-1642 Repository: kafka Description (updated) --- Fixes two issues with the computation of ready nodes and poll timeouts in Sender/RecordAccumulator: 1. The timeout was computed incorrectly because it took into account all nodes, even if they had data to send such that their timeout would be 0. However, nodes were then filtered based on whether it was possible to send (i.e. their connection was still good) which could result in nothing to send and a 0 timeout, resulting in busy looping. Instead, the timeout needs to be computed only using data that cannot be immediately sent, i.e. where the timeout will be greater than 0. This timeout is only used if, after filtering by whether connections are ready for sending, there is no data to be sent. Other events can wake the thread up earlier, e.g. a client reconnects and becomes ready again. 2. One of the conditions indicating whether data is sendable is whether a timeout has expired -- either the linger time or the retry backoff. This condition wasn't accounting for both cases properly, always using the linger time. This means the retry backoff was probably not being respected. KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but none can send data because they are in a connection backoff period. Addressing Jun's comments. Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java d304660f29246e9600efe3ddb28cfcc2b074bed3 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 29658d4a15f112dc0af5ce517eaab93e6f00134b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java eea270abb16f40c9f3b47c4ea96be412fb4fdc8b clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c5d470011d334318d5ee801021aadd0c000974a6 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 clients/src/test/java/org/apache/kafka/clients/MockClient.java aae8d4a1e98279470587d397cc779a9baf6fee6c clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 0762b35abba0551f23047348c5893bb8c9acff14 Diff: https://reviews.apache.org/r/26885/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1642: - Attachment: KAFKA-1642_2014-10-23_16:19:41.patch [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182165#comment-14182165 ] Ewen Cheslack-Postava commented on KAFKA-1642: -- Updated reviewboard https://reviews.apache.org/r/26885/diff/ against branch origin/trunk [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1721) Snappy compressor is not thread safe
[ https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1721: - Attachment: KAFKA-1721.patch Snappy compressor is not thread safe Key: KAFKA-1721 URL: https://issues.apache.org/jira/browse/KAFKA-1721 Project: Kafka Issue Type: Bug Components: compression Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1721.patch From the mailing list, it can generate this exception: 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) This appears to be an issue with the snappy-java library using ThreadLocal for an internal buffer recycling object which results in that object being shared unsafely across threads if one thread sends to multiple producers: {quote} I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1721) Snappy compressor is not thread safe
[ https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182280#comment-14182280 ] Ewen Cheslack-Postava commented on KAFKA-1721: -- Created reviewboard https://reviews.apache.org/r/27124/diff/ against branch origin/trunk Snappy compressor is not thread safe Key: KAFKA-1721 URL: https://issues.apache.org/jira/browse/KAFKA-1721 Project: Kafka Issue Type: Bug Components: compression Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1721.patch From the mailing list, it can generate this exception: 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) This appears to be an issue with the snappy-java library using ThreadLocal for an internal buffer recycling object which results in that object being shared unsafely across threads if one thread sends to multiple producers: {quote} I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1721) Snappy compressor is not thread safe
[ https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1721: - Status: Patch Available (was: Open) Snappy compressor is not thread safe Key: KAFKA-1721 URL: https://issues.apache.org/jira/browse/KAFKA-1721 Project: Kafka Issue Type: Bug Components: compression Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1721.patch From the mailing list, it can generate this exception: 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) This appears to be an issue with the snappy-java library using ThreadLocal for an internal buffer recycling object which results in that object being shared unsafely across threads if one thread sends to multiple producers: {quote} I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 27124: Patch for KAFKA-1721
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27124/ --- Review request for kafka. Bugs: KAFKA-1721 https://issues.apache.org/jira/browse/KAFKA-1721 Repository: kafka Description --- KAFKA-1721 Bump snappy-java version for thread-safety fix. Diffs - build.gradle c3e6bb839ad65c512c9db4695d2bb49b82c80da5 Diff: https://reviews.apache.org/r/27124/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Re: Review Request 26885: Patch for KAFKA-1642
On Oct. 27, 2014, 12:13 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, line 122 https://reviews.apache.org/r/26885/diff/3/?file=731636#file731636line122 The comments When connecting or connected, this handles slow/stalled connections here are a bit misleading: after checking the code I realize connectionDelay is only triggered to detemine the delay in milis that we can re-check connectivity for node that is not connected, and hence if the node is connected again while we are determining its delay, we just set it to MAX. Instead of making it general to the KafkaClient interface, shall we just add this to the code block of line 155? It gets triggered any time NetworkClient.ready returns false for a node. The obvious case is that it will return not ready when disconnected, but it also does so when connecting or when connected but inFlightRequests.canSendMore() returns false (thus the mention of slow/stalled connections. The important thing is that the value returned *is* MAX_VALUE in those latter cases because neither one will be resolved by polling -- they both require an external event (connection established/failed or outstanding request receives a response) which should wake up the event loop when there's something to do. That keeps us from polling unnecessarily. Previously there were conditions in which connections in these states could trigger busy waiting of the poll loop. I don't think we can get the same effect just inlining the code because it uses state that's only available through ClusterConnectionStates, which is private to NetworkClient. The KafkaClient only exposes the higher level concept of ready. - Ewen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/#review58575 --- On Oct. 23, 2014, 11:19 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/ --- (Updated Oct. 23, 2014, 11:19 p.m.) Review request for kafka. Bugs: KAFKA-1642 https://issues.apache.org/jira/browse/KAFKA-1642 Repository: kafka Description --- Fixes two issues with the computation of ready nodes and poll timeouts in Sender/RecordAccumulator: 1. The timeout was computed incorrectly because it took into account all nodes, even if they had data to send such that their timeout would be 0. However, nodes were then filtered based on whether it was possible to send (i.e. their connection was still good) which could result in nothing to send and a 0 timeout, resulting in busy looping. Instead, the timeout needs to be computed only using data that cannot be immediately sent, i.e. where the timeout will be greater than 0. This timeout is only used if, after filtering by whether connections are ready for sending, there is no data to be sent. Other events can wake the thread up earlier, e.g. a client reconnects and becomes ready again. 2. One of the conditions indicating whether data is sendable is whether a timeout has expired -- either the linger time or the retry backoff. This condition wasn't accounting for both cases properly, always using the linger time. This means the retry backoff was probably not being respected. KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but none can send data because they are in a connection backoff period. Addressing Jun's comments. Diffs - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java d304660f29246e9600efe3ddb28cfcc2b074bed3 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 29658d4a15f112dc0af5ce517eaab93e6f00134b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java eea270abb16f40c9f3b47c4ea96be412fb4fdc8b clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c5d470011d334318d5ee801021aadd0c000974a6 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 clients/src/test/java/org/apache/kafka/clients/MockClient.java aae8d4a1e98279470587d397cc779a9baf6fee6c clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 0762b35abba0551f23047348c5893bb8c9acff14 Diff: https://reviews.apache.org/r/26885/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Review Request 27232: Patch for KAFKA-559
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27232/ --- Review request for kafka. Bugs: KAFKA-559 https://issues.apache.org/jira/browse/KAFKA-559 Repository: kafka Description --- Addressing Joel's comments. Fix naming: entires - entries. Only remove partitions from a group if all partitions were last modified before the threshold date. Diffs - core/src/main/scala/kafka/tools/CleanupObsoleteZkEntries.scala PRE-CREATION Diff: https://reviews.apache.org/r/27232/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-559) Garbage collect old consumer metadata entries
[ https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-559: Attachment: KAFKA-559.patch Garbage collect old consumer metadata entries - Key: KAFKA-559 URL: https://issues.apache.org/jira/browse/KAFKA-559 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Assignee: Tejas Patil Labels: newbie, project Attachments: KAFKA-559.patch, KAFKA-559.v1.patch, KAFKA-559.v2.patch Many use cases involve tranient consumers. These consumers create entries under their consumer group in zk and maintain offsets there as well. There is currently no way to delete these entries. It would be good to have a tool that did something like bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] --zookeeper [zk_connect] This would scan through consumer group entries and delete any that had no offset update since the given date. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-559) Garbage collect old consumer metadata entries
[ https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-559: Assignee: Ewen Cheslack-Postava (was: Tejas Patil) Status: Patch Available (was: Open) Garbage collect old consumer metadata entries - Key: KAFKA-559 URL: https://issues.apache.org/jira/browse/KAFKA-559 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Assignee: Ewen Cheslack-Postava Labels: newbie, project Attachments: KAFKA-559.patch, KAFKA-559.v1.patch, KAFKA-559.v2.patch Many use cases involve tranient consumers. These consumers create entries under their consumer group in zk and maintain offsets there as well. There is currently no way to delete these entries. It would be good to have a tool that did something like bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] --zookeeper [zk_connect] This would scan through consumer group entries and delete any that had no offset update since the given date. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-559) Garbage collect old consumer metadata entries
[ https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14185377#comment-14185377 ] Ewen Cheslack-Postava commented on KAFKA-559: - Created reviewboard https://reviews.apache.org/r/27232/diff/ against branch origin/trunk Garbage collect old consumer metadata entries - Key: KAFKA-559 URL: https://issues.apache.org/jira/browse/KAFKA-559 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Assignee: Tejas Patil Labels: newbie, project Attachments: KAFKA-559.patch, KAFKA-559.v1.patch, KAFKA-559.v2.patch Many use cases involve tranient consumers. These consumers create entries under their consumer group in zk and maintain offsets there as well. There is currently no way to delete these entries. It would be good to have a tool that did something like bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] --zookeeper [zk_connect] This would scan through consumer group entries and delete any that had no offset update since the given date. -- This message was sent by Atlassian JIRA (v6.3.4#6332)