[jira] [Comment Edited] (KAFKA-1510) Force offset commits when migrating consumer offsets from zookeeper to kafka
[ https://issues.apache.org/jira/browse/KAFKA-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14075473#comment-14075473 ] nicu marasoiu edited comment on KAFKA-1510 at 7/27/14 6:34 AM: --- forcing all to zk too does indeed have the drawback that it will typically copy the same offsets again, and not only once but potentially several times (if kafka is retried). However the alternative is to commit to both kafka and zookeeper unconditionally in the normal flow (right now, the commit to zk happens only after a successful commit to kafka if any). That too poses the same risk of committing multiple times to a system (zk) if the other (kafka) needs retries. So a clean way here would be a completely different OffsetDAO implementation, one on kafka , one on zookeeper, and one on dual mode, and read, as now max(both), while write goes to the 2 implementations, each of them doing retries without affecting the other! was (Author: nmarasoi): forcing all to zk too does indeed have the drawback that it will typically copy the same offsets again, and not only once but potentially several times (if kafka is retried). However the alternative is to commit to both kafka and zookeeper unconditionally in the normal flow (right now, the commit to zk happens only after a successful commit to kafka if any). That too poses the same risk of committing multiple times to a system (zk) if the other (kafka) needs retries. So a clean way here would be a completely different OffsetDAO implementation, one on kafka , one on zookeeper, and one on dual mode, and read, as now max(both), while write goes to the 2 implementations, each of them doing retries without affecting the other! Also, the code is written in a blocking manner, serializing operations with brokers and kafka/zk, which can be done all in parallel on different tcp connections and different threads (or in the same thread with NIO). I think a non blocking client architecture that can do things in parallel is underway, with the new clients in java, is it? Force offset commits when migrating consumer offsets from zookeeper to kafka Key: KAFKA-1510 URL: https://issues.apache.org/jira/browse/KAFKA-1510 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Joel Koshy Assignee: nicu marasoiu Labels: newbie Fix For: 0.8.2 Attachments: forceCommitOnShutdownWhenDualCommit.patch When migrating consumer offsets from ZooKeeper to kafka, we have to turn on dual-commit (i.e., the consumers will commit offsets to both zookeeper and kafka) in addition to setting offsets.storage to kafka. However, when we commit offsets we only commit offsets if they have changed (since the last commit). For low-volume topics or for topics that receive data in bursts offsets may not move for a long period of time. Therefore we may want to force the commit (even if offsets have not changed) when migrating (i.e., when dual-commit is enabled) - we can add a minimum interval threshold (say force commit after every 10 auto-commits) as well as on rebalance and shutdown. Also, I think it is safe to switch the default for offsets.storage from zookeeper to kafka and set the default to dual-commit (for people who have not migrated yet). We have deployed this to the largest consumers at linkedin and have not seen any issues so far (except for the migration caveat that this jira will resolve). -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Issue Comment Deleted] (KAFKA-1510) Force offset commits when migrating consumer offsets from zookeeper to kafka
[ https://issues.apache.org/jira/browse/KAFKA-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nicu marasoiu updated KAFKA-1510: - Comment: was deleted (was: attached the patch with the meaning detailed in my prev comment) Force offset commits when migrating consumer offsets from zookeeper to kafka Key: KAFKA-1510 URL: https://issues.apache.org/jira/browse/KAFKA-1510 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Joel Koshy Assignee: nicu marasoiu Labels: newbie Fix For: 0.8.2 Attachments: forceCommitOnShutdownWhenDualCommit.patch When migrating consumer offsets from ZooKeeper to kafka, we have to turn on dual-commit (i.e., the consumers will commit offsets to both zookeeper and kafka) in addition to setting offsets.storage to kafka. However, when we commit offsets we only commit offsets if they have changed (since the last commit). For low-volume topics or for topics that receive data in bursts offsets may not move for a long period of time. Therefore we may want to force the commit (even if offsets have not changed) when migrating (i.e., when dual-commit is enabled) - we can add a minimum interval threshold (say force commit after every 10 auto-commits) as well as on rebalance and shutdown. Also, I think it is safe to switch the default for offsets.storage from zookeeper to kafka and set the default to dual-commit (for people who have not migrated yet). We have deployed this to the largest consumers at linkedin and have not seen any issues so far (except for the migration caveat that this jira will resolve). -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (KAFKA-1510) Force offset commits when migrating consumer offsets from zookeeper to kafka
[ https://issues.apache.org/jira/browse/KAFKA-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14075447#comment-14075447 ] nicu marasoiu edited comment on KAFKA-1510 at 7/27/14 6:38 AM: --- Hi, isAutoCommit argument works exactly the other way around, apparently it is false from the scheduled auto commit and to true from zkConsConn.commitOffsets()? So the migration of offsets from zk to kafka is to : set dual commit and kafka storage, restart consumers, wait for kafka to be copied on the offset commits, and take out dual commit. So currently kafka is copied with the offsets only when data flows, and for the purpose of this task, we need to add one or 2 more cases when it is getting the offset: when shutting down, or perhaps periodically. So this task applies only when storage==kafka and dualCommit ==true, right? I would first ask why the write to zookeeper the new offsets, only if the write to kafka was ok? My assumption is To make sure only one write to zookeeper, even though the process of writing to kafka may involve retries. I would write both directions at all time, and perhaps keep 2 checkpoint structures, one kafka one zookeeper. I create a patch now with: a forceCommit that will make that all offsets are commited to both kafka and zookeeper when shutting down in dual commit mode. The usefulness of committing all offsets not only to kafka but to zookeeper as well comes at least from one reason: the one I mentioned above, that if kafka offset write fails completely, zookeeper is never copied on that. Forcing all offsets to zk on shutdown too does indeed have the drawback that it will typically copy the same offsets again, and not only once but potentially several times (if kafka is retried). However the alternative is to commit to both kafka and zookeeper unconditionally in the normal flow (right now, the commit to zk happens only after a successful commit to kafka if any). That too poses the same risk of committing multiple times to a system (zk) if the other (kafka) needs retries. So a clean way here would be a completely different OffsetDAO implementation, one on kafka , one on zookeeper, and one on dual mode, and read, as now max(both), while write goes to the 2 implementations, each of them doing retries without affecting the other! was (Author: nmarasoi): Hi, isAutoCommit argument works exactly the other way around, apparently it is false from the scheduled auto commit and to true from zkConsConn.commitOffsets()? So the migration of offsets from zk to kafka is to : set dual commit and kafka storage, restart consumers, wait for kafka to be copied on the offset commits, and take out dual commit. So currently kafka is copied with the offsets only when data flows, and for the purpose of this task, we need to add one or 2 more cases when it is getting the offset: when shutting down, or perhaps periodically. So this task applies only when storage==kafka and dualCommit ==true, right? I would first ask why the write to zookeeper the new offsets, only if the write to kafka was ok? My assumption is To make sure only one write to zookeeper, even though the process of writing to kafka may involve retries. I would write both directions at all time, and perhaps keep 2 checkpoint structures, one kafka one zookeeper. I create a patch now with: a forceCommit that will make that all offsets are commited to both kafka and zookeeper when shutting down in dual commit mode. The usefulness of committing all offsets not only to kafka but to zookeeper as well comes at least from one reason: the one I mentioned above, that if kafka offset write fails completely, zookeeper is never copied on that. Force offset commits when migrating consumer offsets from zookeeper to kafka Key: KAFKA-1510 URL: https://issues.apache.org/jira/browse/KAFKA-1510 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Joel Koshy Assignee: nicu marasoiu Labels: newbie Fix For: 0.8.2 Attachments: forceCommitOnShutdownWhenDualCommit.patch When migrating consumer offsets from ZooKeeper to kafka, we have to turn on dual-commit (i.e., the consumers will commit offsets to both zookeeper and kafka) in addition to setting offsets.storage to kafka. However, when we commit offsets we only commit offsets if they have changed (since the last commit). For low-volume topics or for topics that receive data in bursts offsets may not move for a long period of time. Therefore we may want to force the commit (even if offsets have not changed) when migrating (i.e., when dual-commit is enabled) - we can add a minimum interval threshold (say force commit after every 10
[jira] [Issue Comment Deleted] (KAFKA-1510) Force offset commits when migrating consumer offsets from zookeeper to kafka
[ https://issues.apache.org/jira/browse/KAFKA-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nicu marasoiu updated KAFKA-1510: - Comment: was deleted (was: forcing all to zk too does indeed have the drawback that it will typically copy the same offsets again, and not only once but potentially several times (if kafka is retried). However the alternative is to commit to both kafka and zookeeper unconditionally in the normal flow (right now, the commit to zk happens only after a successful commit to kafka if any). That too poses the same risk of committing multiple times to a system (zk) if the other (kafka) needs retries. So a clean way here would be a completely different OffsetDAO implementation, one on kafka , one on zookeeper, and one on dual mode, and read, as now max(both), while write goes to the 2 implementations, each of them doing retries without affecting the other! ) Force offset commits when migrating consumer offsets from zookeeper to kafka Key: KAFKA-1510 URL: https://issues.apache.org/jira/browse/KAFKA-1510 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Joel Koshy Assignee: nicu marasoiu Labels: newbie Fix For: 0.8.2 Attachments: forceCommitOnShutdownWhenDualCommit.patch When migrating consumer offsets from ZooKeeper to kafka, we have to turn on dual-commit (i.e., the consumers will commit offsets to both zookeeper and kafka) in addition to setting offsets.storage to kafka. However, when we commit offsets we only commit offsets if they have changed (since the last commit). For low-volume topics or for topics that receive data in bursts offsets may not move for a long period of time. Therefore we may want to force the commit (even if offsets have not changed) when migrating (i.e., when dual-commit is enabled) - we can add a minimum interval threshold (say force commit after every 10 auto-commits) as well as on rebalance and shutdown. Also, I think it is safe to switch the default for offsets.storage from zookeeper to kafka and set the default to dual-commit (for people who have not migrated yet). We have deployed this to the largest consumers at linkedin and have not seen any issues so far (except for the migration caveat that this jira will resolve). -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Assigned] (KAFKA-1549) dead brokers coming in the TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nicu marasoiu reassigned KAFKA-1549: Assignee: Jun Rao (was: nicu marasoiu) dead brokers coming in the TopicMetadataResponse Key: KAFKA-1549 URL: https://issues.apache.org/jira/browse/KAFKA-1549 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Environment: trunk Reporter: nicu marasoiu Assignee: Jun Rao Attachments: bringAllBrokers.patch JunRao confirming my observation that brokers are only added to the metadataCache, never removed: The way that we update liveBrokers in MetadataCache.updateCache() doesn't seem right. We only add newly received live brokers to the list. However, there could be existing brokers in that list that are now dead. Those dead brokers shouldn't be returned to the clients. We should probably just take the new live broker list and cache it. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (KAFKA-1559) Upgrade to Gradle 2.0
Stevo Slavic created KAFKA-1559: --- Summary: Upgrade to Gradle 2.0 Key: KAFKA-1559 URL: https://issues.apache.org/jira/browse/KAFKA-1559 Project: Kafka Issue Type: Task Components: build Affects Versions: 0.8.1.1 Reporter: Stevo Slavic Priority: Trivial -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1559) Upgrade Gradle wrapper to Gradle 2.0
[ https://issues.apache.org/jira/browse/KAFKA-1559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stevo Slavic updated KAFKA-1559: Summary: Upgrade Gradle wrapper to Gradle 2.0 (was: Upgrade to Gradle 2.0) Upgrade Gradle wrapper to Gradle 2.0 Key: KAFKA-1559 URL: https://issues.apache.org/jira/browse/KAFKA-1559 Project: Kafka Issue Type: Task Components: build Affects Versions: 0.8.1.1 Reporter: Stevo Slavic Priority: Trivial Labels: build -- This message was sent by Atlassian JIRA (v6.2#6252)
[GitHub] kafka pull request: Upgrade Gradle wrapper to Gradle 2.0
GitHub user sslavic opened a pull request: https://github.com/apache/kafka/pull/29 Upgrade Gradle wrapper to Gradle 2.0 This patch upgrades gradle wrapper to gradle 2.0. As consequence license plugin dependency had to be upgraded as well. Issue: KAFKA-1559 You can merge this pull request into a Git repository by running: $ git pull https://github.com/sslavic/kafka KAFKA-1559 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/29.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #29 commit c64fb1c7c61c4dd2a0ed0989f357caa9de1027a8 Author: Stevo Slavic ssla...@gmail.com Date: 2014-07-27T10:48:56Z KAFKA-1559: Upgraded gradle wrapper to gradle 2.0; as consequence license plugin dependency had to be upgraded as well --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-1559) Upgrade Gradle wrapper to Gradle 2.0
[ https://issues.apache.org/jira/browse/KAFKA-1559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14075603#comment-14075603 ] ASF GitHub Bot commented on KAFKA-1559: --- GitHub user sslavic opened a pull request: https://github.com/apache/kafka/pull/29 Upgrade Gradle wrapper to Gradle 2.0 This patch upgrades gradle wrapper to gradle 2.0. As consequence license plugin dependency had to be upgraded as well. Issue: KAFKA-1559 You can merge this pull request into a Git repository by running: $ git pull https://github.com/sslavic/kafka KAFKA-1559 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/29.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #29 commit c64fb1c7c61c4dd2a0ed0989f357caa9de1027a8 Author: Stevo Slavic ssla...@gmail.com Date: 2014-07-27T10:48:56Z KAFKA-1559: Upgraded gradle wrapper to gradle 2.0; as consequence license plugin dependency had to be upgraded as well Upgrade Gradle wrapper to Gradle 2.0 Key: KAFKA-1559 URL: https://issues.apache.org/jira/browse/KAFKA-1559 Project: Kafka Issue Type: Task Components: build Affects Versions: 0.8.1.1 Reporter: Stevo Slavic Priority: Trivial Labels: build -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1559) Upgrade Gradle wrapper to Gradle 2.0
[ https://issues.apache.org/jira/browse/KAFKA-1559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14075604#comment-14075604 ] Stevo Slavic commented on KAFKA-1559: - Created pull request for this: https://github.com/apache/kafka/pull/29 Upgrade Gradle wrapper to Gradle 2.0 Key: KAFKA-1559 URL: https://issues.apache.org/jira/browse/KAFKA-1559 Project: Kafka Issue Type: Task Components: build Affects Versions: 0.8.1.1 Reporter: Stevo Slavic Priority: Trivial Labels: build -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Issue Comment Deleted] (KAFKA-1559) Upgrade Gradle wrapper to Gradle 2.0
[ https://issues.apache.org/jira/browse/KAFKA-1559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stevo Slavic updated KAFKA-1559: Comment: was deleted (was: Created pull request for this: https://github.com/apache/kafka/pull/29) Upgrade Gradle wrapper to Gradle 2.0 Key: KAFKA-1559 URL: https://issues.apache.org/jira/browse/KAFKA-1559 Project: Kafka Issue Type: Task Components: build Affects Versions: 0.8.1.1 Reporter: Stevo Slavic Priority: Trivial Labels: build -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1559) Upgrade Gradle wrapper to Gradle 2.0
[ https://issues.apache.org/jira/browse/KAFKA-1559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stevo Slavic updated KAFKA-1559: Status: Patch Available (was: Open) Upgrade Gradle wrapper to Gradle 2.0 Key: KAFKA-1559 URL: https://issues.apache.org/jira/browse/KAFKA-1559 Project: Kafka Issue Type: Task Components: build Affects Versions: 0.8.1.1 Reporter: Stevo Slavic Priority: Trivial Labels: build -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1549) dead brokers coming in the TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1549: --- Resolution: Fixed Fix Version/s: 0.8.2 Assignee: nicu marasoiu (was: Jun Rao) Status: Resolved (was: Patch Available) Thanks for the patch. +1 and committed to trunk. Hmm, I am not why you didn't get the email notification. Did you just miss this particular one or you never got any notification? dead brokers coming in the TopicMetadataResponse Key: KAFKA-1549 URL: https://issues.apache.org/jira/browse/KAFKA-1549 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Environment: trunk Reporter: nicu marasoiu Assignee: nicu marasoiu Fix For: 0.8.2 Attachments: bringAllBrokers.patch JunRao confirming my observation that brokers are only added to the metadataCache, never removed: The way that we update liveBrokers in MetadataCache.updateCache() doesn't seem right. We only add newly received live brokers to the list. However, there could be existing brokers in that list that are now dead. Those dead brokers shouldn't be returned to the clients. We should probably just take the new live broker list and cache it. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Review Request 23962: Patch for KAFKA-1451
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23962/#review48820 --- Thanks for the patch. A couple of comments below. core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala https://reviews.apache.org/r/23962/#comment8 Both methods can be private. core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala https://reviews.apache.org/r/23962/#comment85556 Let's add an explanation on why we need to do this check in the comment. How about the following? We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, it's possible that the controller has already been elected when we get here. This check will prevent the following createEphemeralPath method to get into an infinite loop if this broker is already the controller. - Jun Rao On July 26, 2014, 9:48 a.m., Manikumar Reddy O wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23962/ --- (Updated July 26, 2014, 9:48 a.m.) Review request for kafka. Bugs: KAFKA-1451 https://issues.apache.org/jira/browse/KAFKA-1451 Repository: kafka Description --- controller existence check added in election process of ZookeeperLeaderElector Diffs - core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala e5b6ff1e2544b043007cf16a6b9dd4451c839e63 Diff: https://reviews.apache.org/r/23962/diff/ Testing --- Thanks, Manikumar Reddy O
[jira] [Commented] (KAFKA-1559) Upgrade Gradle wrapper to Gradle 2.0
[ https://issues.apache.org/jira/browse/KAFKA-1559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14075649#comment-14075649 ] Jun Rao commented on KAFKA-1559: Could you attach the changes as a patch to this jira? This is needed by Apache. Thanks, Upgrade Gradle wrapper to Gradle 2.0 Key: KAFKA-1559 URL: https://issues.apache.org/jira/browse/KAFKA-1559 Project: Kafka Issue Type: Task Components: build Affects Versions: 0.8.1.1 Reporter: Stevo Slavic Priority: Trivial Labels: build -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1558) AdminUtils.deleteTopic does not work
[ https://issues.apache.org/jira/browse/KAFKA-1558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14075653#comment-14075653 ] Jun Rao commented on KAFKA-1558: Delete topic is not supported in 0.8.1.1. It is implemented in trunk, but hasn't been fully tested. The way this works is not to delete the topic path directly. Instead, it first indicates in a separate path that we want to delete a topic. Once all data related to the topic is deleted, the controller will then delete the topic path. AdminUtils.deleteTopic does not work Key: KAFKA-1558 URL: https://issues.apache.org/jira/browse/KAFKA-1558 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Henning Schmiedehausen the AdminUtils:.deleteTopic method is implemented as {code} def deleteTopic(zkClient: ZkClient, topic: String) { ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic)) } {code} but the DeleteTopicCommand actually does {code} zkClient = new ZkClient(zkConnect, 3, 3, ZKStringSerializer) zkClient.deleteRecursive(ZkUtils.getTopicPath(topic)) {code} so I guess, that the 'createPersistentPath' above should actually be {code} def deleteTopic(zkClient: ZkClient, topic: String) { ZkUtils.deletePathRecursive(zkClient, ZkUtils.getTopicPath(topic)) } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1533) transient unit test failure in ProducerFailureHandlingTest
[ https://issues.apache.org/jira/browse/KAFKA-1533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1533: --- Attachment: stack.out I saw this test hang again. Attached is the stacktrace. daemon-producer seems to hang during shutdown. Not sure why though. transient unit test failure in ProducerFailureHandlingTest -- Key: KAFKA-1533 URL: https://issues.apache.org/jira/browse/KAFKA-1533 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Guozhang Wang Fix For: 0.8.2 Attachments: KAFKA-1533.patch, KAFKA-1533.patch, KAFKA-1533.patch, KAFKA-1533_2014-07-21_15:45:58.patch, stack.out Occasionally, saw the test hang on tear down. The following is the stack trace. Test worker prio=5 tid=7f9246956000 nid=0x10e078000 in Object.wait() [10e075000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on 7f4e69578 (a org.apache.zookeeper.ClientCnxn$Packet) at java.lang.Object.wait(Object.java:485) at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1344) - locked 7f4e69578 (a org.apache.zookeeper.ClientCnxn$Packet) at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:732) at org.I0Itec.zkclient.ZkConnection.delete(ZkConnection.java:91) at org.I0Itec.zkclient.ZkClient$8.call(ZkClient.java:720) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) at org.I0Itec.zkclient.ZkClient.delete(ZkClient.java:716) at kafka.utils.ZkUtils$.deletePath(ZkUtils.scala:416) at kafka.utils.ZkUtils$.deregisterBrokerInZk(ZkUtils.scala:184) at kafka.server.KafkaHealthcheck.shutdown(KafkaHealthcheck.scala:50) at kafka.server.KafkaServer$$anonfun$shutdown$2.apply$mcV$sp(KafkaServer.scala:243) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:45) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:45) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:243) at kafka.api.ProducerFailureHandlingTest.tearDown(ProducerFailureHandlingTest.scala:90) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Reopened] (KAFKA-1533) transient unit test failure in ProducerFailureHandlingTest
[ https://issues.apache.org/jira/browse/KAFKA-1533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao reopened KAFKA-1533: transient unit test failure in ProducerFailureHandlingTest -- Key: KAFKA-1533 URL: https://issues.apache.org/jira/browse/KAFKA-1533 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Guozhang Wang Fix For: 0.8.2 Attachments: KAFKA-1533.patch, KAFKA-1533.patch, KAFKA-1533.patch, KAFKA-1533_2014-07-21_15:45:58.patch, stack.out Occasionally, saw the test hang on tear down. The following is the stack trace. Test worker prio=5 tid=7f9246956000 nid=0x10e078000 in Object.wait() [10e075000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on 7f4e69578 (a org.apache.zookeeper.ClientCnxn$Packet) at java.lang.Object.wait(Object.java:485) at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1344) - locked 7f4e69578 (a org.apache.zookeeper.ClientCnxn$Packet) at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:732) at org.I0Itec.zkclient.ZkConnection.delete(ZkConnection.java:91) at org.I0Itec.zkclient.ZkClient$8.call(ZkClient.java:720) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) at org.I0Itec.zkclient.ZkClient.delete(ZkClient.java:716) at kafka.utils.ZkUtils$.deletePath(ZkUtils.scala:416) at kafka.utils.ZkUtils$.deregisterBrokerInZk(ZkUtils.scala:184) at kafka.server.KafkaHealthcheck.shutdown(KafkaHealthcheck.scala:50) at kafka.server.KafkaServer$$anonfun$shutdown$2.apply$mcV$sp(KafkaServer.scala:243) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:45) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:45) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:243) at kafka.api.ProducerFailureHandlingTest.tearDown(ProducerFailureHandlingTest.scala:90) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1542) normal IOException in the new producer is logged as ERROR
[ https://issues.apache.org/jira/browse/KAFKA-1542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1542: --- Resolution: Fixed Fix Version/s: 0.8.2 Assignee: David Corley Status: Resolved (was: Patch Available) Thanks for the patch. Committed to trunk. normal IOException in the new producer is logged as ERROR - Key: KAFKA-1542 URL: https://issues.apache.org/jira/browse/KAFKA-1542 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: David Corley Labels: newbie Fix For: 0.8.2 Attachments: KAFKA-1542.patch Saw the following error in the log. It seems this can happen if the broker is down. So, this probably should be logged as WARN, instead ERROR. 2014/07/16 00:12:51.799 [Selector] Error in I/O: java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60) at org.apache.kafka.common.network.Selector.poll(Selector.java:241) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:171) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:114) at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14075671#comment-14075671 ] Jun Rao commented on KAFKA-1414: 50. Got it. I was thinking that if for some reason, one of the two values is defined, but the other is not, perhaps it's better to error out instead of silently let it go. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1, 0.8.2, 0.9.0 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Fix For: 0.8.2 Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, KAFKA-1414-rev4.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
Build failed in Jenkins: Kafka-trunk #233
See https://builds.apache.org/job/Kafka-trunk/233/changes Changes: [junrao] kafka-1549; dead brokers coming in the TopicMetadataResponse; patched by Nicu Marasoiu; reviewed by Jun Rao -- [...truncated 444 lines...] org.apache.kafka.common.record.RecordTest testEquality[46] PASSED org.apache.kafka.common.record.RecordTest testFields[47] PASSED org.apache.kafka.common.record.RecordTest testChecksum[47] PASSED org.apache.kafka.common.record.RecordTest testEquality[47] PASSED org.apache.kafka.common.record.RecordTest testFields[48] PASSED org.apache.kafka.common.record.RecordTest testChecksum[48] PASSED org.apache.kafka.common.record.RecordTest testEquality[48] PASSED org.apache.kafka.common.record.RecordTest testFields[49] PASSED org.apache.kafka.common.record.RecordTest testChecksum[49] PASSED org.apache.kafka.common.record.RecordTest testEquality[49] PASSED org.apache.kafka.common.record.RecordTest testFields[50] PASSED org.apache.kafka.common.record.RecordTest testChecksum[50] PASSED org.apache.kafka.common.record.RecordTest testEquality[50] PASSED org.apache.kafka.common.record.RecordTest testFields[51] PASSED org.apache.kafka.common.record.RecordTest testChecksum[51] PASSED org.apache.kafka.common.record.RecordTest testEquality[51] PASSED org.apache.kafka.common.record.RecordTest testFields[52] PASSED org.apache.kafka.common.record.RecordTest testChecksum[52] PASSED org.apache.kafka.common.record.RecordTest testEquality[52] PASSED org.apache.kafka.common.record.RecordTest testFields[53] PASSED org.apache.kafka.common.record.RecordTest testChecksum[53] PASSED org.apache.kafka.common.record.RecordTest testEquality[53] PASSED org.apache.kafka.common.record.RecordTest testFields[54] PASSED org.apache.kafka.common.record.RecordTest testChecksum[54] PASSED org.apache.kafka.common.record.RecordTest testEquality[54] PASSED org.apache.kafka.common.record.RecordTest testFields[55] PASSED org.apache.kafka.common.record.RecordTest testChecksum[55] PASSED org.apache.kafka.common.record.RecordTest testEquality[55] PASSED org.apache.kafka.common.record.RecordTest testFields[56] PASSED org.apache.kafka.common.record.RecordTest testChecksum[56] PASSED org.apache.kafka.common.record.RecordTest testEquality[56] PASSED org.apache.kafka.common.record.RecordTest testFields[57] PASSED org.apache.kafka.common.record.RecordTest testChecksum[57] PASSED org.apache.kafka.common.record.RecordTest testEquality[57] PASSED org.apache.kafka.common.record.RecordTest testFields[58] PASSED org.apache.kafka.common.record.RecordTest testChecksum[58] PASSED org.apache.kafka.common.record.RecordTest testEquality[58] PASSED org.apache.kafka.common.record.RecordTest testFields[59] PASSED org.apache.kafka.common.record.RecordTest testChecksum[59] PASSED org.apache.kafka.common.record.RecordTest testEquality[59] PASSED org.apache.kafka.common.record.RecordTest testFields[60] PASSED org.apache.kafka.common.record.RecordTest testChecksum[60] PASSED org.apache.kafka.common.record.RecordTest testEquality[60] PASSED org.apache.kafka.common.record.RecordTest testFields[61] PASSED org.apache.kafka.common.record.RecordTest testChecksum[61] PASSED org.apache.kafka.common.record.RecordTest testEquality[61] PASSED org.apache.kafka.common.record.RecordTest testFields[62] PASSED org.apache.kafka.common.record.RecordTest testChecksum[62] PASSED org.apache.kafka.common.record.RecordTest testEquality[62] PASSED org.apache.kafka.common.record.RecordTest testFields[63] PASSED org.apache.kafka.common.record.RecordTest testChecksum[63] PASSED org.apache.kafka.common.record.RecordTest testEquality[63] PASSED org.apache.kafka.common.record.RecordTest testFields[64] PASSED org.apache.kafka.common.record.RecordTest testChecksum[64] PASSED org.apache.kafka.common.record.RecordTest testEquality[64] PASSED org.apache.kafka.common.record.RecordTest testFields[65] PASSED org.apache.kafka.common.record.RecordTest testChecksum[65] PASSED org.apache.kafka.common.record.RecordTest testEquality[65] PASSED org.apache.kafka.common.record.RecordTest testFields[66] PASSED org.apache.kafka.common.record.RecordTest testChecksum[66] PASSED org.apache.kafka.common.record.RecordTest testEquality[66] PASSED org.apache.kafka.common.record.RecordTest testFields[67] PASSED org.apache.kafka.common.record.RecordTest testChecksum[67] PASSED org.apache.kafka.common.record.RecordTest testEquality[67] PASSED org.apache.kafka.common.record.RecordTest testFields[68] PASSED org.apache.kafka.common.record.RecordTest testChecksum[68] PASSED org.apache.kafka.common.record.RecordTest testEquality[68] PASSED org.apache.kafka.common.record.RecordTest testFields[69] PASSED org.apache.kafka.common.record.RecordTest testChecksum[69] PASSED org.apache.kafka.common.record.RecordTest testEquality[69] PASSED
Re: Gradle hanging on unit test run against ProducerFailureHangingTest-testBrokerFailure
Nope. It definitely sent. Are there some restrictions on mailing list attachments I wonder? I'll put it inline here: = 2014-07-25 18:40:15 Full thread dump Java HotSpot(TM) 64-Bit Server VM (20.65-b04-462 mixed mode): Attach Listener daemon prio=9 tid=7fcfb92a5000 nid=0x11a961000 waiting on condition [] java.lang.Thread.State: RUNNABLE kafka-scheduler-17 daemon prio=5 tid=7fcfbb80c000 nid=0x1387c3000 waiting on condition [1387c2000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 7f53b84d8 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) at java.util.concurrent.DelayQueue.take(DelayQueue.java:164) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:957) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:917) at java.lang.Thread.run(Thread.java:695) ReplicaFetcherThread-0-0 prio=5 tid=7fcfbb80b000 nid=0x1385bd000 runnable [1385bb000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method) at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:136) at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:69) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69) - locked 7f53b7578 (a sun.nio.ch.Util$2) - locked 7f53b7560 (a java.util.Collections$UnmodifiableSet) - locked 7f5400668 (a sun.nio.ch.KQueueSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80) at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:193) - locked 7f53b7590 (a java.lang.Object) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86) - locked 7f5405530 (a sun.nio.ch.SocketAdaptor$SocketInputStream) at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221) - locked 7f5407fc0 (a java.lang.Object) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) - locked 7f5407ff0 (a java.lang.Object) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) Controller-0-to-broker-1-send-thread prio=5 tid=7fcfbb809800 nid=0x1384ba000 waiting on condition [1384b9000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 7f53eba88 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:121) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) kafka-scheduler-16 daemon prio=5 tid=7fcfbb809000 nid=0x1383b7000 waiting on condition [1383b6000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 7f53b84d8 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at
[jira] [Commented] (KAFKA-1477) add authentication layer and initial JKS x509 implementation for brokers, producers and consumer for network communication
[ https://issues.apache.org/jira/browse/KAFKA-1477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14075679#comment-14075679 ] Jun Rao commented on KAFKA-1477: Joe, I'd rather that we don't rush to get this patch in. This is because (1) We have accumulated a lot of changes for the next release (0.8.2) in trunk, including Kafka-based offset management and the new clients. It's going to be difficult to absorb big patches like this in the same release. (2) I felt that we haven't had enough discussion on the implementation. I took at look at the changes that you made in https://cwiki.apache.org/confluence/display/KAFKA/Security. What's in there are mostly feature requirements. I was expecting to see a design doc of the implementation. I am no security expert, but I have questions like (a) should we use two separate server ports so that we can support both secure and non-secure clients in the same cluster (b) is a local secure file the right way to store security credentials? If we have a more concrete design doc, perhaps more people with security experience can chime in and help us make the right design choice. We can also discuss whether the security feature should only be done on the new clients or not. At this moment, we are trying to put the old clients mostly in maintenance mode and will only try to fix blocker issues. The more we need to patch on the old clients, the more the maintenance work. Also, the new consumer will remove the ZK dependence. That potentially will make adding the security feature a bit easier on the consumer. So, I recommend that we start working on a more concrete design doc first and then solicit some feedback. We can probably target this feature in 0.9. add authentication layer and initial JKS x509 implementation for brokers, producers and consumer for network communication -- Key: KAFKA-1477 URL: https://issues.apache.org/jira/browse/KAFKA-1477 Project: Kafka Issue Type: New Feature Reporter: Joe Stein Assignee: Ivan Lyutov Fix For: 0.8.2 Attachments: KAFKA-1477-binary.patch, KAFKA-1477.patch, KAFKA-1477_2014-06-02_16:59:40.patch, KAFKA-1477_2014-06-02_17:24:26.patch, KAFKA-1477_2014-06-03_13:46:17.patch -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Gradle hanging on unit test run against ProducerFailureHangingTest-testBrokerFailure
David, Apache mailing list doesn't seem to allow large attachments. Could you attach the stacktrace to the jira KAFKA-1533 (now reopened)? Thanks, Jun On Sun, Jul 27, 2014 at 11:21 AM, David Corley davidcor...@gmail.com wrote: Nope. It definitely sent. Are there some restrictions on mailing list attachments I wonder? I'll put it inline here: = 2014-07-25 18:40:15 Full thread dump Java HotSpot(TM) 64-Bit Server VM (20.65-b04-462 mixed mode): Attach Listener daemon prio=9 tid=7fcfb92a5000 nid=0x11a961000 waiting on condition [] java.lang.Thread.State: RUNNABLE kafka-scheduler-17 daemon prio=5 tid=7fcfbb80c000 nid=0x1387c3000 waiting on condition [1387c2000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 7f53b84d8 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) at java.util.concurrent.DelayQueue.take(DelayQueue.java:164) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:957) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:917) at java.lang.Thread.run(Thread.java:695) ReplicaFetcherThread-0-0 prio=5 tid=7fcfbb80b000 nid=0x1385bd000 runnable [1385bb000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method) at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:136) at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:69) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69) - locked 7f53b7578 (a sun.nio.ch.Util$2) - locked 7f53b7560 (a java.util.Collections$UnmodifiableSet) - locked 7f5400668 (a sun.nio.ch.KQueueSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80) at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:193) - locked 7f53b7590 (a java.lang.Object) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86) - locked 7f5405530 (a sun.nio.ch.SocketAdaptor$SocketInputStream) at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221) - locked 7f5407fc0 (a java.lang.Object) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) - locked 7f5407ff0 (a java.lang.Object) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) Controller-0-to-broker-1-send-thread prio=5 tid=7fcfbb809800 nid=0x1384ba000 waiting on condition [1384b9000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 7f53eba88 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:121) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Build failed in Jenkins: Kafka-trunk #234
See https://builds.apache.org/job/Kafka-trunk/234/changes Changes: [junrao] kafka-1542; normal IOException in the new producer is logged as ERROR; patched by David Corley; reviewed by Jun Rao -- [...truncated 673 lines...] kafka.server.LogRecoveryTest testHWCheckpointNoFailuresSingleLogSegment PASSED kafka.server.LogRecoveryTest testHWCheckpointWithFailuresSingleLogSegment PASSED kafka.server.LogRecoveryTest testHWCheckpointNoFailuresMultipleLogSegments PASSED kafka.server.LogRecoveryTest testHWCheckpointWithFailuresMultipleLogSegments PASSED kafka.server.LeaderElectionTest testLeaderElectionAndEpoch PASSED kafka.server.LeaderElectionTest testLeaderElectionWithStaleControllerEpoch PASSED kafka.server.LogOffsetTest testGetOffsetsForUnknownTopic PASSED kafka.server.LogOffsetTest testGetOffsetsBeforeLatestTime PASSED kafka.server.LogOffsetTest testEmptyLogsGetOffsets PASSED kafka.server.LogOffsetTest testGetOffsetsBeforeNow PASSED kafka.server.LogOffsetTest testGetOffsetsBeforeEarliestTime PASSED kafka.server.IsrExpirationTest testIsrExpirationForStuckFollowers PASSED kafka.server.IsrExpirationTest testIsrExpirationForSlowFollowers PASSED kafka.server.AdvertiseBrokerTest testBrokerAdvertiseToZK PASSED kafka.server.RequestPurgatoryTest testRequestSatisfaction PASSED kafka.server.RequestPurgatoryTest testRequestExpiry PASSED kafka.server.DynamicConfigChangeTest testConfigChange PASSED kafka.server.DynamicConfigChangeTest testConfigChangeOnNonExistingTopic PASSED kafka.server.OffsetCommitTest testUpdateOffsets PASSED kafka.server.OffsetCommitTest testCommitAndFetchOffsets PASSED kafka.server.OffsetCommitTest testLargeMetadataPayload PASSED kafka.server.ReplicaFetchTest testReplicaFetcherThread PASSED kafka.server.ReplicaManagerTest testHighWaterMarkDirectoryMapping PASSED kafka.server.ReplicaManagerTest testHighwaterMarkRelativeDirectoryMapping PASSED kafka.server.KafkaConfigTest testLogRetentionTimeHoursProvided PASSED kafka.server.KafkaConfigTest testLogRetentionTimeMinutesProvided PASSED kafka.server.KafkaConfigTest testLogRetentionTimeMsProvided PASSED kafka.server.KafkaConfigTest testLogRetentionTimeNoConfigProvided PASSED kafka.server.KafkaConfigTest testLogRetentionTimeBothMinutesAndHoursProvided PASSED kafka.server.KafkaConfigTest testLogRetentionTimeBothMinutesAndMsProvided PASSED kafka.server.KafkaConfigTest testAdvertiseDefaults PASSED kafka.server.KafkaConfigTest testAdvertiseConfigured PASSED kafka.server.KafkaConfigTest testUncleanLeaderElectionDefault PASSED kafka.server.KafkaConfigTest testUncleanElectionDisabled PASSED kafka.server.KafkaConfigTest testUncleanElectionEnabled PASSED kafka.server.KafkaConfigTest testUncleanElectionInvalid PASSED kafka.server.KafkaConfigTest testLogRollTimeMsProvided PASSED kafka.server.KafkaConfigTest testLogRollTimeBothMsAndHoursProvided PASSED kafka.server.KafkaConfigTest testLogRollTimeNoConfigProvided PASSED kafka.server.SimpleFetchTest testNonReplicaSeesHwWhenFetching PASSED kafka.server.SimpleFetchTest testReplicaSeesLeoWhenFetching PASSED kafka.server.ServerShutdownTest testCleanShutdown PASSED kafka.server.ServerShutdownTest testCleanShutdownWithDeleteTopicEnabled PASSED kafka.server.HighwatermarkPersistenceTest testHighWatermarkPersistenceSinglePartition PASSED kafka.server.HighwatermarkPersistenceTest testHighWatermarkPersistenceMultiplePartitions PASSED kafka.consumer.ZookeeperConsumerConnectorTest testBasic PASSED kafka.consumer.ZookeeperConsumerConnectorTest testCompression PASSED kafka.consumer.ZookeeperConsumerConnectorTest testCompressionSetConsumption PASSED kafka.consumer.ZookeeperConsumerConnectorTest testConsumerDecoder PASSED kafka.consumer.ZookeeperConsumerConnectorTest testLeaderSelectionForPartition PASSED kafka.consumer.ConsumerIteratorTest testConsumerIteratorDeduplicationDeepIterator PASSED kafka.consumer.ConsumerIteratorTest testConsumerIteratorDecodingFailure PASSED kafka.consumer.TopicFilterTest testWhitelists PASSED kafka.consumer.TopicFilterTest testBlacklists PASSED kafka.consumer.TopicFilterTest testWildcardTopicCountGetTopicCountMapEscapeJson PASSED kafka.log.LogTest testTimeBasedLogRoll PASSED kafka.log.LogTest testSizeBasedLogRoll PASSED kafka.log.LogTest testLoadEmptyLog PASSED kafka.log.LogTest testAppendAndReadWithSequentialOffsets PASSED kafka.log.LogTest testAppendAndReadWithNonSequentialOffsets PASSED kafka.log.LogTest testReadAtLogGap PASSED kafka.log.LogTest testReadOutOfRange PASSED kafka.log.LogTest testLogRolls PASSED kafka.log.LogTest testCompressedMessages PASSED kafka.log.LogTest testThatGarbageCollectingSegmentsDoesntChangeOffset PASSED kafka.log.LogTest testMessageSizeCheck PASSED kafka.log.LogTest testLogRecoversToCorrectOffset PASSED kafka.log.LogTest testIndexRebuild PASSED kafka.log.LogTest testTruncateTo PASSED
[jira] [Updated] (KAFKA-1559) Upgrade Gradle wrapper to Gradle 2.0
[ https://issues.apache.org/jira/browse/KAFKA-1559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stevo Slavic updated KAFKA-1559: Attachment: KAFKA-1559.patch Attached [^KAFKA-1559.patch] Upgrade Gradle wrapper to Gradle 2.0 Key: KAFKA-1559 URL: https://issues.apache.org/jira/browse/KAFKA-1559 Project: Kafka Issue Type: Task Components: build Affects Versions: 0.8.1.1 Reporter: Stevo Slavic Priority: Trivial Labels: build Attachments: KAFKA-1559.patch -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Karamanov updated KAFKA-1414: --- Attachment: KAFKA-1414-rev5.patch 50. Used original approach, as seen before patching. Fixed everything else. [Updated patch|^KAFKA-1414-rev5.patch] Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1, 0.8.2, 0.9.0 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Fix For: 0.8.2 Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, KAFKA-1414-rev4.patch, KAFKA-1414-rev5.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1559) Upgrade Gradle wrapper to Gradle 2.0
[ https://issues.apache.org/jira/browse/KAFKA-1559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14075800#comment-14075800 ] Joe Stein commented on KAFKA-1559: -- KAFKA-1490 should be taken care of before the next release this upgrade maybe could resolve that (getting rid of the binary and wrapper) or needs to follow it. Upgrade Gradle wrapper to Gradle 2.0 Key: KAFKA-1559 URL: https://issues.apache.org/jira/browse/KAFKA-1559 Project: Kafka Issue Type: Task Components: build Affects Versions: 0.8.1.1 Reporter: Stevo Slavic Priority: Trivial Labels: build Attachments: KAFKA-1559.patch -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Review Request 23339: Patch for KAFKA-1507
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23339/#review48822 --- Thanks for the patch. A few more comments below. clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java https://reviews.apache.org/r/23339/#comment85560 We need to pass in the createTopic flag in this constructor too. In the producer, we will then set createTopic to true and the in the consumer, we will set it to true. clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java https://reviews.apache.org/r/23339/#comment85558 Yes, I agree with you that keeping the V0 constructor is a bit confusing and doesn't provide any value. Could you remove this and the V0 constructor OffsetCommitRequest as well? clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java https://reviews.apache.org/r/23339/#comment85557 Is this needed? core/src/main/scala/kafka/client/ClientUtils.scala https://reviews.apache.org/r/23339/#comment85559 I think we can just keep the original name and keep the default for createTopic to true to make it backward compatible. core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala https://reviews.apache.org/r/23339/#comment85561 To make this backward compatible, we need to keep the current constructor and add a new one with the createTopic flag. - Jun Rao On July 24, 2014, 12:07 a.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23339/ --- (Updated July 24, 2014, 12:07 a.m.) Review request for kafka. Bugs: KAFKA-1507 https://issues.apache.org/jira/browse/KAFKA-1507 Repository: kafka Description --- KAFKA-1507. Using GetOffsetShell against non-existent topic creates the topic unintentionally. Changes as per Jun's suggestions. Diffs - clients/src/main/java/org/apache/kafka/clients/NetworkClient.java d8f9ce663ee24d2b0852c974136741280c39f8f8 clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java 4aa5b01d611631db72df47d50bbe30edb8c478db clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 444e69e7c95d5ffad19896fff0ab15cb4f5c9b4e clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b22ca1dce65f665d84c2a980fd82f816e93d9960 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/TopicMetadataRequest.scala 7dca09ce637a40e125de05703dc42e8b611971ac core/src/main/scala/kafka/client/ClientUtils.scala ce7ede3f6d60e756e252257bd8c6fedc21f21e1c core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b9e2bea7b442a19bcebd1b350d39541a8c9dd068 core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b0b7be14d494ae8c87f4443b52db69d273c20316 core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 core/src/main/scala/kafka/tools/GetOffsetShell.scala 9c6064e201eebbcd5b276a0dedd02937439edc94 core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala af4783646803e58714770c21f8c3352370f26854 core/src/main/scala/kafka/tools/SimpleConsumerShell.scala 36314f412a8281aece2789fd2b74a106b82c57d2 core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 1bf2667f47853585bc33ffb3e28256ec5f24ae84 core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala 35dc071b1056e775326981573c9618d8046e601d Diff: https://reviews.apache.org/r/23339/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14075875#comment-14075875 ] saurabh agarwal commented on KAFKA-1555: There are two configuration parameters- dfs.replication and dfs.replication.min. The behavior you described above relate to dfs.replication configuration. dfs.replication.min enforces that there are minimum number of replicas should be written, then only write will succeed. Otherwise it will fail. Here is abstract from Tom White's Hadoop book: It’s possible, but unlikely, that multiple datanodes fail while a block is being written. As long as dfs.replication.min replicas (which default to one) are written, the write will succeed, and the block will be asynchronously replicated across the cluster until its target replication factor is reached (dfs.replication, which defaults to three). As you suggest, we can increase the replication factors, it will reduce the possibility of data loss, but it does not guarantee that there are more than one copy of data. Ace =-1 ensures that it will receive ack from the replicas in ISR. What I am suggesting that using a new config min.isr.required, Kafka ensures that the message has been written to a min number of replicas (must be in ISR), then only the producer.send is successful. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Neha Narkhede In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1414: --- Resolution: Fixed Assignee: Anton Karamanov (was: Jay Kreps) Status: Resolved (was: Patch Available) Thanks for patch v5. +1 and committed to trunk. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1, 0.8.2, 0.9.0 Reporter: Dmitry Bugaychenko Assignee: Anton Karamanov Fix For: 0.8.2 Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, KAFKA-1414-rev4.patch, KAFKA-1414-rev5.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)