[jira] [Created] (KAFKA-7411) Change system to future and change back will make replication not working
Pengwei created KAFKA-7411: -- Summary: Change system to future and change back will make replication not working Key: KAFKA-7411 URL: https://issues.apache.org/jira/browse/KAFKA-7411 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 2.0.0, 1.1.1 Reporter: Pengwei When we change one of the follower's system time to future for some time, then change the system time back, we will find the replication not working. this is because the replication thread need to determine: buildFetchRequest -> partitionFetchState.isReadyForFetch and the DelayedItem's dueMs time is future, but after change back the system time, in the DelayedItem's 's getDelay function will have a large time: def getDelay(unit: TimeUnit): Long = { unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), TimeUnit.MILLISECONDS) } due to dueMs is future time -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-5752) Delete topic and re-create topic immediate will delete the new topic's timeindex
Pengwei created KAFKA-5752: -- Summary: Delete topic and re-create topic immediate will delete the new topic's timeindex Key: KAFKA-5752 URL: https://issues.apache.org/jira/browse/KAFKA-5752 Project: Kafka Issue Type: Bug Affects Versions: 0.11.0.0, 0.10.2.0 Reporter: Pengwei When we delete the topic and re-create the topic with the same name, we will find after the async delete topic is finished, async delete will remove the newly created topic's time index. This is because in the LogManager's asyncDelete, it will change the log and index's file pointer to the renamed log and index, but missing the time index. So will cause this issue -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5553) Delete topic failed to change from OnlineReplica to ReplicaDeletionStarted if ISR not created
Pengwei created KAFKA-5553: -- Summary: Delete topic failed to change from OnlineReplica to ReplicaDeletionStarted if ISR not created Key: KAFKA-5553 URL: https://issues.apache.org/jira/browse/KAFKA-5553 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 0.11.0.0, 0.10.2.0, 0.9.0.1 Reporter: Pengwei We found the error log as follow and the topic can not remove for a long time: [2016-07-11 20:17:52,965] ERROR Controller 1328 epoch 315 initiated state change of replica 1328 for partition [websocket_test_topic,0] from OnlineReplica to ReplicaDeletionStarted failed (state.change.logger) java.lang.AssertionError: assertion failed: Replica [Topic=websocket_test_topic,Partition=0,Replica=1328] should be in the OfflineReplica states before moving to ReplicaDeletionStarted state. Instead it is in OnlineReplica state at scala.Predef$.assert(Predef.scala:165) at kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:309) at kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:190) at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114) at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114) at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:322) at kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114) at kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:344) at kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:334) at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) at kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:334) at kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:367) at kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:313) at kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:312) at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) at kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:312) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:431) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:403) at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:403) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:397) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5425) Kafka replication support all log segment files sync with leader
Pengwei created KAFKA-5425: -- Summary: Kafka replication support all log segment files sync with leader Key: KAFKA-5425 URL: https://issues.apache.org/jira/browse/KAFKA-5425 Project: Kafka Issue Type: Improvement Reporter: Pengwei Priority: Minor Currently kafka replication only support follower sync the leader with the latest fetch offset, it means offsets smaller than this fetch offset's messages are already sync. After complete sync with the leader, it will update the fetch offset to log end offset. But if the log segments which latest offset is smaller than current fetch offset are being delete, the replication will not sync these log segments back to kafka. In this case, there is a risk of losing messages if the follow is becoming a leader, consumer begin to consume from begin to end will found some messages are lost. Can we improve the kafka replication mechanism to ensure follow''s all log segments are the same as the leader? Even if some log segments are delete, will try to recover from the leader -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-4862) Kafka client connect to a shutdown node will block for a long time
[ https://issues.apache.org/jira/browse/KAFKA-4862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pengwei updated KAFKA-4862: --- Status: Patch Available (was: Open) > Kafka client connect to a shutdown node will block for a long time > -- > > Key: KAFKA-4862 > URL: https://issues.apache.org/jira/browse/KAFKA-4862 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.2.0, 0.9.0.0 >Reporter: Pengwei >Assignee: Pengwei > Fix For: 0.11.0.0 > > > Currently in our test env, we found after one of the broker node crash(reboot > or os crash), the client maybe still connecting to the crash node to send > metadata request or other request, and it need about several minutes to > aware the connection is timeout then try another node to connect to send the > request. Then the client may still not aware the metadata change after > several minutes. > We don't have a connection timeout for the network client, we should add a > connection timeout for the client -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-5014) SSL Channel not ready but tcp is established and the server is hung will not sending metadata
Pengwei created KAFKA-5014: -- Summary: SSL Channel not ready but tcp is established and the server is hung will not sending metadata Key: KAFKA-5014 URL: https://issues.apache.org/jira/browse/KAFKA-5014 Project: Kafka Issue Type: Bug Affects Versions: 0.10.2.0, 0.9.0.1 Reporter: Pengwei Priority: Minor In our test env, QA hang one of the connecting broker of the producer, then the producer will be stuck in send method, and throw the exception: fail to update metadata after request timeout. I found the reason as follow: when the producer chose one of the broker to send metadata, it connect to the broker, but the broker is hang, the tcp is connected and Network client marks this broker is connected, but the SSL channel is not ready yet so the channel is not ready. Then the Network client chooses the connected node in the leastLoadedNode every time to send the metadata, but the node's channel is not ready yet. So the producer stuck in getting metadata and will not try another node to request metadata. The client should not stuck only one node is hung -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (KAFKA-4790) Kafka cannot recover after a disk full
[ https://issues.apache.org/jira/browse/KAFKA-4790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904333#comment-15904333 ] Pengwei edited comment on KAFKA-4790 at 3/10/17 10:54 AM: -- [~junrao] If KIP-98 can fix this issue, we can wait for the KIP-98.There is a method to avoid this issue, we should make sure the log.segment.bytes / log.index.interval.bytes <= log.index.size.max.bytes / 8 8 bytes is every index item size In our case, we should set log.segment.bytes to 512MB instead of 1G, after set log.index.size.max.bytes to 1MB was (Author: pengwei): [~junrao] If KIP-98 can fix this issue, we can wait for the KIP-98.There is a method to avoid this issue, we should make sure the log.segment.bytes / log.index.interval.bytes >= log.index.size.max.bytes / 8 8 bytes is every index item size In our case, we should set log.segment.bytes to 512MB instead of 1G, after set log.index.size.max.bytes to 1MB > Kafka cannot recover after a disk full > -- > > Key: KAFKA-4790 > URL: https://issues.apache.org/jira/browse/KAFKA-4790 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.1, 0.10.1.1 >Reporter: Pengwei > Labels: reliability > Fix For: 0.11.0.0 > > > [2017-02-23 18:43:57,736] INFO zookeeper state changed (SyncConnected) > (org.I0Itec.zkclient.ZkClient) > [2017-02-23 18:43:57,887] INFO Loading logs. (kafka.log.LogManager) > [2017-02-23 18:43:57,935] INFO Recovering unflushed segment 0 in log test1-0. > (kafka.log.Log) > [2017-02-23 18:43:59,297] ERROR There was an error in one of the threads > during logs loading: java.lang.IllegalArgumentException: requirement failed: > Attempt to append to a full index (size = 128000). (kafka.log.LogManager) > [2017-02-23 18:43:59,299] FATAL Fatal error during KafkaServer startup. > Prepare to shutdown (kafka.server.KafkaServer) > java.lang.IllegalArgumentException: requirement failed: Attempt to append to > a full index (size = 128000). > at scala.Predef$.require(Predef.scala:219) > at > kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:200) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.log.OffsetIndex.append(OffsetIndex.scala:199) > at kafka.log.LogSegment.recover(LogSegment.scala:191) > at kafka.log.Log.recoverLog(Log.scala:259) > at kafka.log.Log.loadSegments(Log.scala:234) > at kafka.log.Log.(Log.scala:92) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$4$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:201) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4790) Kafka cannot recover after a disk full
[ https://issues.apache.org/jira/browse/KAFKA-4790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904333#comment-15904333 ] Pengwei commented on KAFKA-4790: [~junrao] If KIP-98 can fix this issue, we can wait for the KIP-98.There is a method to avoid this issue, we should make sure the log.segment.bytes / log.index.interval.bytes >= log.index.size.max.bytes / 8 8 bytes is every index item size In our case, we should set log.segment.bytes to 512MB instead of 1G, after set log.index.size.max.bytes to 1MB > Kafka cannot recover after a disk full > -- > > Key: KAFKA-4790 > URL: https://issues.apache.org/jira/browse/KAFKA-4790 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.1, 0.10.1.1 >Reporter: Pengwei > Labels: reliability > > [2017-02-23 18:43:57,736] INFO zookeeper state changed (SyncConnected) > (org.I0Itec.zkclient.ZkClient) > [2017-02-23 18:43:57,887] INFO Loading logs. (kafka.log.LogManager) > [2017-02-23 18:43:57,935] INFO Recovering unflushed segment 0 in log test1-0. > (kafka.log.Log) > [2017-02-23 18:43:59,297] ERROR There was an error in one of the threads > during logs loading: java.lang.IllegalArgumentException: requirement failed: > Attempt to append to a full index (size = 128000). (kafka.log.LogManager) > [2017-02-23 18:43:59,299] FATAL Fatal error during KafkaServer startup. > Prepare to shutdown (kafka.server.KafkaServer) > java.lang.IllegalArgumentException: requirement failed: Attempt to append to > a full index (size = 128000). > at scala.Predef$.require(Predef.scala:219) > at > kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:200) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.log.OffsetIndex.append(OffsetIndex.scala:199) > at kafka.log.LogSegment.recover(LogSegment.scala:191) > at kafka.log.Log.recoverLog(Log.scala:259) > at kafka.log.Log.loadSegments(Log.scala:234) > at kafka.log.Log.(Log.scala:92) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$4$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:201) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-4862) Kafka client connect to a shutdown node will block for a long time
Pengwei created KAFKA-4862: -- Summary: Kafka client connect to a shutdown node will block for a long time Key: KAFKA-4862 URL: https://issues.apache.org/jira/browse/KAFKA-4862 Project: Kafka Issue Type: Bug Affects Versions: 0.10.2.0, 0.9.0.0 Reporter: Pengwei Assignee: Pengwei Fix For: 0.11.0.0 Currently in our test env, we found after one of the broker node crash(reboot or os crash), the client maybe still connecting to the crash node to send metadata request or other request, and it need about several minutes to aware the connection is timeout then try another node to connect to send the request. Then the client may still not aware the metadata change after several minutes. We don't have a connection timeout for the network client, we should add a connection timeout for the client -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4790) Kafka can't not recover after a disk full
[ https://issues.apache.org/jira/browse/KAFKA-4790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880567#comment-15880567 ] Pengwei commented on KAFKA-4790: The index max file set to 1MB log.index.size.max.bytes=1024000 The reason I found is below: 1. Producer batch a lot of message into kafka(for example 512k) , so every write will have more than 4k(index write interval ), for example write 2050 index entry. 2. At the same time disk is full, the kafka is dead before recover point is flush into disk 3. Restart the kafka, the recover function, will check every msgs to re-append the index item into the index file, then every 4k message will write a index entry, then the total index entry will exceed 2050 or maybe exceed the index full's max entries > Kafka can't not recover after a disk full > - > > Key: KAFKA-4790 > URL: https://issues.apache.org/jira/browse/KAFKA-4790 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.1, 0.10.1.1 >Reporter: Pengwei > Labels: reliability > Fix For: 0.10.2.1 > > > [2017-02-23 18:43:57,736] INFO zookeeper state changed (SyncConnected) > (org.I0Itec.zkclient.ZkClient) > [2017-02-23 18:43:57,887] INFO Loading logs. (kafka.log.LogManager) > [2017-02-23 18:43:57,935] INFO Recovering unflushed segment 0 in log test1-0. > (kafka.log.Log) > [2017-02-23 18:43:59,297] ERROR There was an error in one of the threads > during logs loading: java.lang.IllegalArgumentException: requirement failed: > Attempt to append to a full index (size = 128000). (kafka.log.LogManager) > [2017-02-23 18:43:59,299] FATAL Fatal error during KafkaServer startup. > Prepare to shutdown (kafka.server.KafkaServer) > java.lang.IllegalArgumentException: requirement failed: Attempt to append to > a full index (size = 128000). > at scala.Predef$.require(Predef.scala:219) > at > kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:200) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.log.OffsetIndex.append(OffsetIndex.scala:199) > at kafka.log.LogSegment.recover(LogSegment.scala:191) > at kafka.log.Log.recoverLog(Log.scala:259) > at kafka.log.Log.loadSegments(Log.scala:234) > at kafka.log.Log.(Log.scala:92) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$4$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:201) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-4790) Kafka can't not recover after a disk full
Pengwei created KAFKA-4790: -- Summary: Kafka can't not recover after a disk full Key: KAFKA-4790 URL: https://issues.apache.org/jira/browse/KAFKA-4790 Project: Kafka Issue Type: Bug Affects Versions: 0.10.1.1, 0.9.0.1 Reporter: Pengwei Fix For: 0.10.2.1 [2017-02-23 18:43:57,736] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2017-02-23 18:43:57,887] INFO Loading logs. (kafka.log.LogManager) [2017-02-23 18:43:57,935] INFO Recovering unflushed segment 0 in log test1-0. (kafka.log.Log) [2017-02-23 18:43:59,297] ERROR There was an error in one of the threads during logs loading: java.lang.IllegalArgumentException: requirement failed: Attempt to append to a full index (size = 128000). (kafka.log.LogManager) [2017-02-23 18:43:59,299] FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.lang.IllegalArgumentException: requirement failed: Attempt to append to a full index (size = 128000). at scala.Predef$.require(Predef.scala:219) at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:200) at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199) at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at kafka.log.OffsetIndex.append(OffsetIndex.scala:199) at kafka.log.LogSegment.recover(LogSegment.scala:191) at kafka.log.Log.recoverLog(Log.scala:259) at kafka.log.Log.loadSegments(Log.scala:234) at kafka.log.Log.(Log.scala:92) at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$4$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:201) at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue
[ https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15817467#comment-15817467 ] Pengwei edited comment on KAFKA-3042 at 1/11/17 7:27 AM: - [~lindong] It seems I still encounter this issue after apply your patch in our test enviroment The log is also logging the message "Cached zkVersion " and cannot recover The reproducer step we used is to stop three broker's network some time and restart the network, then we found one of the brokers keep printing the cached zkversion log was (Author: pengwei): [~lindong] It seems I still encounter this issue after apply your patch in our test enviroment The log is also logging the message "Cached zkVersion " and cannot recover > updateIsr should stop after failed several times due to zkVersion issue > --- > > Key: KAFKA-3042 > URL: https://issues.apache.org/jira/browse/KAFKA-3042 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 > Environment: jdk 1.7 > centos 6.4 >Reporter: Jiahongchao >Assignee: Dong Lin > Labels: reliability > Fix For: 0.10.2.0 > > Attachments: controller.log, server.log.2016-03-23-01, > state-change.log > > > sometimes one broker may repeatly log > "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR" > I think this is because the broker consider itself as the leader in fact it's > a follower. > So after several failed tries, it need to find out who is the leader -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue
[ https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pengwei reopened KAFKA-3042: > updateIsr should stop after failed several times due to zkVersion issue > --- > > Key: KAFKA-3042 > URL: https://issues.apache.org/jira/browse/KAFKA-3042 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 > Environment: jdk 1.7 > centos 6.4 >Reporter: Jiahongchao >Assignee: Dong Lin > Labels: reliability > Fix For: 0.10.2.0 > > Attachments: controller.log, server.log.2016-03-23-01, > state-change.log > > > sometimes one broker may repeatly log > "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR" > I think this is because the broker consider itself as the leader in fact it's > a follower. > So after several failed tries, it need to find out who is the leader -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue
[ https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15817467#comment-15817467 ] Pengwei commented on KAFKA-3042: [~lindong] It seems I still encounter this issue after apply your patch in our test enviroment The log is also logging the message "Cached zkVersion " and cannot recover > updateIsr should stop after failed several times due to zkVersion issue > --- > > Key: KAFKA-3042 > URL: https://issues.apache.org/jira/browse/KAFKA-3042 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 > Environment: jdk 1.7 > centos 6.4 >Reporter: Jiahongchao >Assignee: Dong Lin > Labels: reliability > Fix For: 0.10.2.0 > > Attachments: controller.log, server.log.2016-03-23-01, > state-change.log > > > sometimes one broker may repeatly log > "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR" > I think this is because the broker consider itself as the leader in fact it's > a follower. > So after several failed tries, it need to find out who is the leader -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers
[ https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15811007#comment-15811007 ] Pengwei commented on KAFKA-4595: One method is to add a lock timeout method instead of blocking locking in the deleteTopicStopReplicaCallback。 If timeout, this call is failed and the RequestSendThread can be stop after this round. And it seems like to be the broker have receive the stop replicas command, but does not response the result to the controller, if the broker recover, it can receive the stop replicas command again and response to the controller. > Controller send thread can't stop when broker change listener event trigger > for dead brokers > - > > Key: KAFKA-4595 > URL: https://issues.apache.org/jira/browse/KAFKA-4595 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0, 0.10.1.1 >Reporter: Pengwei >Priority: Critical > Labels: reliability > Fix For: 0.10.2.0 > > > In our test env, we found controller is not working after a delete topic > opertation and network issue, the stack is below: > "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" > #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on > condition [0x7fb76b7c8000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc05497b8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50) > at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128) > at > kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81) > - locked <0xc0258760> (a java.lang.Object) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) > at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) >Locked ownable synchronizers: > - <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 > tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de] >java.lang.Thread.State: WAITING (parking) >
[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers
[ https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15803816#comment-15803816 ] Pengwei commented on KAFKA-4595: [~huxi_2b] > Controller send thread can't stop when broker change listener event trigger > for dead brokers > - > > Key: KAFKA-4595 > URL: https://issues.apache.org/jira/browse/KAFKA-4595 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0, 0.10.1.1 >Reporter: Pengwei > Fix For: 0.10.2.0 > > > In our test env, we found controller is not working after a delete topic > opertation and network issue, the stack is below: > "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" > #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on > condition [0x7fb76b7c8000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc05497b8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50) > at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128) > at > kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81) > - locked <0xc0258760> (a java.lang.Object) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) > at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) >Locked ownable synchronizers: > - <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 > tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) > at >
[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers
[ https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15803812#comment-15803812 ] Pengwei commented on KAFKA-4595: Yes, you are right. > Controller send thread can't stop when broker change listener event trigger > for dead brokers > - > > Key: KAFKA-4595 > URL: https://issues.apache.org/jira/browse/KAFKA-4595 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0, 0.10.1.1 >Reporter: Pengwei > Fix For: 0.10.2.0 > > > In our test env, we found controller is not working after a delete topic > opertation and network issue, the stack is below: > "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" > #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on > condition [0x7fb76b7c8000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc05497b8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50) > at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128) > at > kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81) > - locked <0xc0258760> (a java.lang.Object) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) > at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) >Locked ownable synchronizers: > - <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 > tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) >
[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers
[ https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15801343#comment-15801343 ] Pengwei commented on KAFKA-4595: The broker change event is trigger to remove dead brokers 1003, it lock the controller lock and need to stop the Controller-1001-to-broker-1003-send-thread, but this send thread is calling the deleteTopicStopReplicaCallback, it also try to get the controller lock, so dead lock between these two operation. > Controller send thread can't stop when broker change listener event trigger > for dead brokers > - > > Key: KAFKA-4595 > URL: https://issues.apache.org/jira/browse/KAFKA-4595 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0, 0.10.1.1 >Reporter: Pengwei > Fix For: 0.10.2.0 > > > In our test env, we found controller is not working after a delete topic > opertation and network issue, the stack is below: > "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" > #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on > condition [0x7fb76b7c8000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc05497b8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50) > at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128) > at > kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81) > - locked <0xc0258760> (a java.lang.Object) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) > at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) >Locked ownable synchronizers: > - <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 > tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at
[jira] [Created] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers
Pengwei created KAFKA-4595: -- Summary: Controller send thread can't stop when broker change listener event trigger for dead brokers Key: KAFKA-4595 URL: https://issues.apache.org/jira/browse/KAFKA-4595 Project: Kafka Issue Type: Bug Affects Versions: 0.10.1.1, 0.9.0.0 Reporter: Pengwei Fix For: 0.10.2.0 In our test env, we found controller is not working after a delete topic opertation and network issue, the stack is below: "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on condition [0x7fb76b7c8000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0xc05497b8> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50) at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32) at kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128) at kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81) - locked <0xc0258760> (a java.lang.Object) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) Locked ownable synchronizers: - <0xc02587f8> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0xc02587f8> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) at
[jira] [Updated] (KAFKA-4229) Controller can't start after several zk expired event
[ https://issues.apache.org/jira/browse/KAFKA-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pengwei updated KAFKA-4229: --- Status: Patch Available (was: In Progress) PR : https://github.com/apache/kafka/pull/2175 > Controller can't start after several zk expired event > - > > Key: KAFKA-4229 > URL: https://issues.apache.org/jira/browse/KAFKA-4229 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.10.0.1, 0.10.0.0, 0.9.0.1, 0.9.0.0 >Reporter: Pengwei >Assignee: Pengwei > > We found the controller not started after several zk expired event in our > test environment. By analysing the log, I found the controller will handle > the ephemeral node data delete event first and then the zk expired event , > then the controller will gone. > I can reproducer it on my develop env: > 1. set up a one broker and one zk env, specify a very large zk timeout (20s) > 2. stop the broker and remove the zk's /broker/ids/0 directory > 3. restart the broker and make a breakpoint in the zk client's event thread > to queue the delete event. > 4. after the /controller node gone the breakpoint will hit. > 5. expired the current session(suspend the send thread) and create a new > session s2 > 6. resume the event thread, then the controller will handle > LeaderChangeListener.handleDataDeleted and become leader > 7. then controller will handle SessionExpirationListener.handleNewSession, it > resign the controller and elect, but when elect it found the /controller > node is exist and not become the leader. But the /controller node is created > by current session s2 will not remove. So the controller is gone -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-4229) Controller can't start after several zk expired event
[ https://issues.apache.org/jira/browse/KAFKA-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-4229 started by Pengwei. -- > Controller can't start after several zk expired event > - > > Key: KAFKA-4229 > URL: https://issues.apache.org/jira/browse/KAFKA-4229 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Pengwei >Assignee: Pengwei > > We found the controller not started after several zk expired event in our > test environment. By analysing the log, I found the controller will handle > the ephemeral node data delete event first and then the zk expired event , > then the controller will gone. > I can reproducer it on my develop env: > 1. set up a one broker and one zk env, specify a very large zk timeout (20s) > 2. stop the broker and remove the zk's /broker/ids/0 directory > 3. restart the broker and make a breakpoint in the zk client's event thread > to queue the delete event. > 4. after the /controller node gone the breakpoint will hit. > 5. expired the current session(suspend the send thread) and create a new > session s2 > 6. resume the event thread, then the controller will handle > LeaderChangeListener.handleDataDeleted and become leader > 7. then controller will handle SessionExpirationListener.handleNewSession, it > resign the controller and elect, but when elect it found the /controller > node is exist and not become the leader. But the /controller node is created > by current session s2 will not remove. So the controller is gone -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4229) Controller can't start after several zk expired event
[ https://issues.apache.org/jira/browse/KAFKA-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pengwei updated KAFKA-4229: --- Reviewer: Guozhang Wang > Controller can't start after several zk expired event > - > > Key: KAFKA-4229 > URL: https://issues.apache.org/jira/browse/KAFKA-4229 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Pengwei >Assignee: Pengwei > > We found the controller not started after several zk expired event in our > test environment. By analysing the log, I found the controller will handle > the ephemeral node data delete event first and then the zk expired event , > then the controller will gone. > I can reproducer it on my develop env: > 1. set up a one broker and one zk env, specify a very large zk timeout (20s) > 2. stop the broker and remove the zk's /broker/ids/0 directory > 3. restart the broker and make a breakpoint in the zk client's event thread > to queue the delete event. > 4. after the /controller node gone the breakpoint will hit. > 5. expired the current session(suspend the send thread) and create a new > session s2 > 6. resume the event thread, then the controller will handle > LeaderChangeListener.handleDataDeleted and become leader > 7. then controller will handle SessionExpirationListener.handleNewSession, it > resign the controller and elect, but when elect it found the /controller > node is exist and not become the leader. But the /controller node is created > by current session s2 will not remove. So the controller is gone -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4229) Controller can't start after several zk expired event
[ https://issues.apache.org/jira/browse/KAFKA-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15698992#comment-15698992 ] Pengwei commented on KAFKA-4229: The PR is : https://github.com/apache/kafka/pull/2175 > Controller can't start after several zk expired event > - > > Key: KAFKA-4229 > URL: https://issues.apache.org/jira/browse/KAFKA-4229 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Pengwei >Assignee: Pengwei > > We found the controller not started after several zk expired event in our > test environment. By analysing the log, I found the controller will handle > the ephemeral node data delete event first and then the zk expired event , > then the controller will gone. > I can reproducer it on my develop env: > 1. set up a one broker and one zk env, specify a very large zk timeout (20s) > 2. stop the broker and remove the zk's /broker/ids/0 directory > 3. restart the broker and make a breakpoint in the zk client's event thread > to queue the delete event. > 4. after the /controller node gone the breakpoint will hit. > 5. expired the current session(suspend the send thread) and create a new > session s2 > 6. resume the event thread, then the controller will handle > LeaderChangeListener.handleDataDeleted and become leader > 7. then controller will handle SessionExpirationListener.handleNewSession, it > resign the controller and elect, but when elect it found the /controller > node is exist and not become the leader. But the /controller node is created > by current session s2 will not remove. So the controller is gone -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4229) Controller can't start after several zk expired event
[ https://issues.apache.org/jira/browse/KAFKA-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571265#comment-15571265 ] Pengwei commented on KAFKA-4229: We test it on 0.9.0.0, but I found the controller code are nearly the same between these versions. In 0.9.0.0, zk version is 3.4.6 > Controller can't start after several zk expired event > - > > Key: KAFKA-4229 > URL: https://issues.apache.org/jira/browse/KAFKA-4229 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Pengwei >Assignee: Pengwei > > We found the controller not started after several zk expired event in our > test environment. By analysing the log, I found the controller will handle > the ephemeral node data delete event first and then the zk expired event , > then the controller will gone. > I can reproducer it on my develop env: > 1. set up a one broker and one zk env, specify a very large zk timeout (20s) > 2. stop the broker and remove the zk's /broker/ids/0 directory > 3. restart the broker and make a breakpoint in the zk client's event thread > to queue the delete event. > 4. after the /controller node gone the breakpoint will hit. > 5. expired the current session(suspend the send thread) and create a new > session s2 > 6. resume the event thread, then the controller will handle > LeaderChangeListener.handleDataDeleted and become leader > 7. then controller will handle SessionExpirationListener.handleNewSession, it > resign the controller and elect, but when elect it found the /controller > node is exist and not become the leader. But the /controller node is created > by current session s2 will not remove. So the controller is gone -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4229) Controller can't start after serveral zk expired event
[ https://issues.apache.org/jira/browse/KAFKA-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15532241#comment-15532241 ] Pengwei commented on KAFKA-4229: I already have a patch for this issue, maybe the issue can assign to me? > Controller can't start after serveral zk expired event > -- > > Key: KAFKA-4229 > URL: https://issues.apache.org/jira/browse/KAFKA-4229 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Pengwei > > We found the controller not started after serveral zk expired event in our > test environment. By analysing the log, I found the controller will handle > the ephemeral node data delete event first and then the zk expired event , > then the controller will gone. > I can reproducer it on my develop env: > 1. set up a one broker and one zk env, specify a very large zk timeout (20s) > 2. stop the broker and remove the zk's /broker/ids/0 directory > 3. restart the broker and make a breakpoint in the zk client's event thread > to queue the delete event. > 4. after the /controller node gone the breakpoint will hit. > 5. expired the current session(suspend the send thread) and create a new > session s2 > 6. resume the event thread, then the controller will handle > LeaderChangeListener.handleDataDeleted and become leader > 7. then controller will handle SessionExpirationListener.handleNewSession, it > resign the controller and elect, but when elect it found the /controller > node is exist and not become the leader. But the /controller node is created > by current session s2 will not remove. So the controller is gone -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4229) Controller can't start after serveral zk expired event
[ https://issues.apache.org/jira/browse/KAFKA-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pengwei updated KAFKA-4229: --- Fix Version/s: 0.10.1.0 > Controller can't start after serveral zk expired event > -- > > Key: KAFKA-4229 > URL: https://issues.apache.org/jira/browse/KAFKA-4229 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Pengwei > > We found the controller not started after serveral zk expired event in our > test environment. By analysing the log, I found the controller will handle > the ephemeral node data delete event first and then the zk expired event , > then the controller will gone. > I can reproducer it on my develop env: > 1. set up a one broker and one zk env, specify a very large zk timeout (20s) > 2. stop the broker and remove the zk's /broker/ids/0 directory > 3. restart the broker and make a breakpoint in the zk client's event thread > to queue the delete event. > 4. after the /controller node gone the breakpoint will hit. > 5. expired the current session(suspend the send thread) and create a new > session s2 > 6. resume the event thread, then the controller will handle > LeaderChangeListener.handleDataDeleted and become leader > 7. then controller will handle SessionExpirationListener.handleNewSession, it > resign the controller and elect, but when elect it found the /controller > node is exist and not become the leader. But the /controller node is created > by current session s2 will not remove. So the controller is gone -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4229) Controller can't start after serveral zk expired event
[ https://issues.apache.org/jira/browse/KAFKA-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pengwei updated KAFKA-4229: --- Fix Version/s: (was: 0.10.1.0) > Controller can't start after serveral zk expired event > -- > > Key: KAFKA-4229 > URL: https://issues.apache.org/jira/browse/KAFKA-4229 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Pengwei > > We found the controller not started after serveral zk expired event in our > test environment. By analysing the log, I found the controller will handle > the ephemeral node data delete event first and then the zk expired event , > then the controller will gone. > I can reproducer it on my develop env: > 1. set up a one broker and one zk env, specify a very large zk timeout (20s) > 2. stop the broker and remove the zk's /broker/ids/0 directory > 3. restart the broker and make a breakpoint in the zk client's event thread > to queue the delete event. > 4. after the /controller node gone the breakpoint will hit. > 5. expired the current session(suspend the send thread) and create a new > session s2 > 6. resume the event thread, then the controller will handle > LeaderChangeListener.handleDataDeleted and become leader > 7. then controller will handle SessionExpirationListener.handleNewSession, it > resign the controller and elect, but when elect it found the /controller > node is exist and not become the leader. But the /controller node is created > by current session s2 will not remove. So the controller is gone -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4229) Controller can't start after serveral zk expired event
Pengwei created KAFKA-4229: -- Summary: Controller can't start after serveral zk expired event Key: KAFKA-4229 URL: https://issues.apache.org/jira/browse/KAFKA-4229 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 0.10.0.1, 0.10.0.0, 0.9.0.1, 0.9.0.0 Reporter: Pengwei We found the controller not started after serveral zk expired event in our test environment. By analysing the log, I found the controller will handle the ephemeral node data delete event first and then the zk expired event , then the controller will gone. I can reproducer it on my develop env: 1. set up a one broker and one zk env, specify a very large zk timeout (20s) 2. stop the broker and remove the zk's /broker/ids/0 directory 3. restart the broker and make a breakpoint in the zk client's event thread to queue the delete event. 4. after the /controller node gone the breakpoint will hit. 5. expired the current session(suspend the send thread) and create a new session s2 6. resume the event thread, then the controller will handle LeaderChangeListener.handleDataDeleted and become leader 7. then controller will handle SessionExpirationListener.handleNewSession, it resign the controller and elect, but when elect it found the /controller node is exist and not become the leader. But the /controller node is created by current session s2 will not remove. So the controller is gone -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1429) Yet another deadlock in controller shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15390887#comment-15390887 ] Pengwei commented on KAFKA-1429: hi Jun, I have open a PR for this issue. Can you review it? thanks~ > Yet another deadlock in controller shutdown > --- > > Key: KAFKA-1429 > URL: https://issues.apache.org/jira/browse/KAFKA-1429 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.1 >Reporter: Dmitry Bugaychenko >Assignee: Neha Narkhede > Attachments: kafka_0.9.0.0_controller_dead_lock.patch > > > Found one more case of deadlock in controller during shutdown: > {code} > ZkClient-EventThread-57-192.168.41.148:2181,192.168.36.250:2181,192.168.41.207:2181 > id=57 state=TIMED_WAITING > - waiting on <0x288a66ec> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > - locked <0x288a66ec> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082) > at > java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1468) > at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:88) > at > kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:339) > at > kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337) > at > kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337) > at kafka.utils.Utils$.inLock(Utils.scala:538) > at > kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:337) > at > kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1068) > at > kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067) > at > kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067) > at kafka.utils.Utils$.inLock(Utils.scala:538) > at > kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1067) > at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > Locked synchronizers: count = 1 > - java.util.concurrent.locks.ReentrantLock$NonfairSync@22b9b31a > kafka-scheduler-0 id=172 state=WAITING > - waiting on <0x22b9b31a> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > - locked <0x22b9b31a> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > owned by > ZkClient-EventThread-57-192.168.41.148:2181,192.168.36.250:2181,192.168.41.207:2181 > id=57 > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214) > at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290) > at kafka.utils.Utils$.inLock(Utils.scala:536) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1110) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1108) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1108) > at >
[jira] [Created] (KAFKA-3913) Old consumer's metrics error when using IPv6
Pengwei created KAFKA-3913: -- Summary: Old consumer's metrics error when using IPv6 Key: KAFKA-3913 URL: https://issues.apache.org/jira/browse/KAFKA-3913 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.10.0.0, 0.9.0.1 Reporter: Pengwei Priority: Minor Fix For: 0.10.1.0 The error is below: [2016-05-09 15:49:20,096] WARN Property security.protocol is not valid (kafka.utils.VerifiableProperties) [2016-05-09 15:49:20,882] WARN Error processing kafka.server:type=FetcherStats,name=RequestsPerSec,clientId=console-consumer-32775,brokerHost=fe80::92e2:baff:fe07:51cc,brokerPort=9093 (com.yammer.metrics.reporting.JmxReporter) javax.management.MalformedObjectNameException: Invalid character ':' in value part of property at javax.management.ObjectName.construct(ObjectName.java:618) at javax.management.ObjectName.(ObjectName.java:1382) at com.yammer.metrics.reporting.JmxReporter.onMetricAdded(JmxReporter.java:395) at com.yammer.metrics.core.MetricsRegistry.notifyMetricAdded(MetricsRegistry.java:516) at com.yammer.metrics.core.MetricsRegistry.getOrAdd(MetricsRegistry.java:491) at com.yammer.metrics.core.MetricsRegistry.newMeter(MetricsRegistry.java:240) at kafka.metrics.KafkaMetricsGroup$class.newMeter(KafkaMetricsGroup.scala:79) at kafka.server.FetcherStats.newMeter(AbstractFetcherThread.scala:264) at kafka.server.FetcherStats.(AbstractFetcherThread.scala:269) at kafka.server.AbstractFetcherThread.(AbstractFetcherThread.scala:55) at kafka.consumer.ConsumerFetcherThread.(ConsumerFetcherThread.scala:38) at kafka.consumer.ConsumerFetcherManager.createFetcherThread(ConsumerFetcherManager.scala:118) at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:83) at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:78) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778) at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:78) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1429) Yet another deadlock in controller shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pengwei updated KAFKA-1429: --- Attachment: kafka_0.9.0.0_controller_dead_lock.patch upload file is the patch to fix this bug, can somebody review it ? > Yet another deadlock in controller shutdown > --- > > Key: KAFKA-1429 > URL: https://issues.apache.org/jira/browse/KAFKA-1429 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.1 >Reporter: Dmitry Bugaychenko >Assignee: Neha Narkhede > Attachments: kafka_0.9.0.0_controller_dead_lock.patch > > > Found one more case of deadlock in controller during shutdown: > {code} > ZkClient-EventThread-57-192.168.41.148:2181,192.168.36.250:2181,192.168.41.207:2181 > id=57 state=TIMED_WAITING > - waiting on <0x288a66ec> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > - locked <0x288a66ec> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082) > at > java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1468) > at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:88) > at > kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:339) > at > kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337) > at > kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337) > at kafka.utils.Utils$.inLock(Utils.scala:538) > at > kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:337) > at > kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1068) > at > kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067) > at > kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067) > at kafka.utils.Utils$.inLock(Utils.scala:538) > at > kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1067) > at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > Locked synchronizers: count = 1 > - java.util.concurrent.locks.ReentrantLock$NonfairSync@22b9b31a > kafka-scheduler-0 id=172 state=WAITING > - waiting on <0x22b9b31a> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > - locked <0x22b9b31a> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > owned by > ZkClient-EventThread-57-192.168.41.148:2181,192.168.36.250:2181,192.168.41.207:2181 > id=57 > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214) > at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290) > at kafka.utils.Utils$.inLock(Utils.scala:536) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1110) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1108) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1108) > at >
[jira] [Commented] (KAFKA-1429) Yet another deadlock in controller shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331523#comment-15331523 ] Pengwei commented on KAFKA-1429: I can still reproduce this issue in 0.9.0.0 or 0.9.0.1, Controller session expire and holding the controllerLock lock and try to wait the autoRebalanceScheduler thread to shutdown, but the autoRebalanceScheduler is running for the parttion rebalance and this thread want the controllerLock : "ZkClient-EventThread-18-9.94.1.21:2181,9.94.1.22:2181,9.94.1.23:2181/oms-cluster-1" #18 daemon prio=5 os_prio=0 tid=0x00c3f800 nid=0x49be waiting on condition [0x7f466603d000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0xd9eaace0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465) at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:98) at kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:370) at kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1171) at kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1170) at kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1170) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1170) at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:734) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) "kafka-scheduler-12" #143 daemon prio=5 os_prio=0 tid=0x7f465c4cc000 nid=0x566f waiting on condition [0x7f466260e000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0xc0a9a480> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:260) at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1198) at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1194) at scala.collection.immutable.Map$Map3.foreach(Map.scala:161) at kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1194) at kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:344) at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110) at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) > Yet another deadlock in controller shutdown > --- > > Key: KAFKA-1429 > URL: https://issues.apache.org/jira/browse/KAFKA-1429 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.1 >Reporter: Dmitry Bugaychenko >Assignee: Neha Narkhede > > Found one
[jira] [Commented] (KAFKA-3585) Shutdown slow when there is only one broker which is controller
[ https://issues.apache.org/jira/browse/KAFKA-3585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15275879#comment-15275879 ] Pengwei commented on KAFKA-3585: do you used the kafka-server-stop.sh to shutdown? > Shutdown slow when there is only one broker which is controller > --- > > Key: KAFKA-3585 > URL: https://issues.apache.org/jira/browse/KAFKA-3585 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.9.0.1 >Reporter: Pengwei >Assignee: Taiyuan Zhang >Priority: Minor > Fix For: 0.10.0.1 > > > Reproducer Step: > 1. Install 3 brokers's cluster > 2. create a topic with 3 partition > 3. shutdown the broker one by one , you will find the last one shutdown very > slow because of error: > [2016-04-19 20:30:19,168] INFO [Kafka Server 1], Remaining partitions to > move: > __consumer_offsets-48,__consumer_offsets-13,__consumer_offsets-46,__consumer_offsets-11,__consumer_offsets-44,__consumer_offsets-42,__consumer_offsets-21,__consumer_offsets-19,__consumer_offsets-32,__consumer_offsets-30,__consumer_offsets-28,__consumer_offsets-26,__consumer_offsets-7,__consumer_offsets-40,__consumer_offsets-38,__consumer_offsets-36,__consumer_offsets-1,__consumer_offsets-34,__consumer_offsets-16,__consumer_offsets-45,__consumer_offsets-14,__consumer_offsets-12,__consumer_offsets-41,__consumer_offsets-10,__consumer_offsets-24,__consumer_offsets-22,__consumer_offsets-20,__consumer_offsets-49,__consumer_offsets-18,__consumer_offsets-31,__consumer_offsets-0,test2-0,__consumer_offsets-27,__consumer_offsets-39,__consumer_offsets-8,__consumer_offsets-37,__consumer_offsets-6,__consumer_offsets-4,__consumer_offsets-2 > (kafka.server.KafkaServer) > [2016-04-19 20:30:19,169] INFO [Kafka Server 1], Error code from controller: > 0 (kafka.server.KafkaServer) > [2016-04-19 20:30:24,169] WARN [Kafka Server 1], Retrying controlled shutdown > after the previous attempt failed... (kafka.server.KafkaServer) > [2016-04-19 20:30:24,171] WARN [Kafka Server 1], Proceeding to do an unclean > shutdown as all the controlled shutdown attempts failed > (kafka.server.KafkaServer) > it is determined by : > controlled.shutdown.retry.backoff.ms = 5000 > controlled.shutdown.max.retries=3 > It slow because the last one can not elect the new leader for the remaining > partitions , the last one can improve to shutdown quickly, we can skip the > shutdown error when it is the last broker -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (KAFKA-3585) Shutdown slow when there is only one broker which is controller
[ https://issues.apache.org/jira/browse/KAFKA-3585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pengwei updated KAFKA-3585: --- Comment: was deleted (was: do you used the kafka-server-stop.sh to shutdown?) > Shutdown slow when there is only one broker which is controller > --- > > Key: KAFKA-3585 > URL: https://issues.apache.org/jira/browse/KAFKA-3585 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.9.0.1 >Reporter: Pengwei >Assignee: Taiyuan Zhang >Priority: Minor > Fix For: 0.10.0.1 > > > Reproducer Step: > 1. Install 3 brokers's cluster > 2. create a topic with 3 partition > 3. shutdown the broker one by one , you will find the last one shutdown very > slow because of error: > [2016-04-19 20:30:19,168] INFO [Kafka Server 1], Remaining partitions to > move: > __consumer_offsets-48,__consumer_offsets-13,__consumer_offsets-46,__consumer_offsets-11,__consumer_offsets-44,__consumer_offsets-42,__consumer_offsets-21,__consumer_offsets-19,__consumer_offsets-32,__consumer_offsets-30,__consumer_offsets-28,__consumer_offsets-26,__consumer_offsets-7,__consumer_offsets-40,__consumer_offsets-38,__consumer_offsets-36,__consumer_offsets-1,__consumer_offsets-34,__consumer_offsets-16,__consumer_offsets-45,__consumer_offsets-14,__consumer_offsets-12,__consumer_offsets-41,__consumer_offsets-10,__consumer_offsets-24,__consumer_offsets-22,__consumer_offsets-20,__consumer_offsets-49,__consumer_offsets-18,__consumer_offsets-31,__consumer_offsets-0,test2-0,__consumer_offsets-27,__consumer_offsets-39,__consumer_offsets-8,__consumer_offsets-37,__consumer_offsets-6,__consumer_offsets-4,__consumer_offsets-2 > (kafka.server.KafkaServer) > [2016-04-19 20:30:19,169] INFO [Kafka Server 1], Error code from controller: > 0 (kafka.server.KafkaServer) > [2016-04-19 20:30:24,169] WARN [Kafka Server 1], Retrying controlled shutdown > after the previous attempt failed... (kafka.server.KafkaServer) > [2016-04-19 20:30:24,171] WARN [Kafka Server 1], Proceeding to do an unclean > shutdown as all the controlled shutdown attempts failed > (kafka.server.KafkaServer) > it is determined by : > controlled.shutdown.retry.backoff.ms = 5000 > controlled.shutdown.max.retries=3 > It slow because the last one can not elect the new leader for the remaining > partitions , the last one can improve to shutdown quickly, we can skip the > shutdown error when it is the last broker -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3585) Shutdown slow when there is only one broker which is controller
[ https://issues.apache.org/jira/browse/KAFKA-3585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15275880#comment-15275880 ] Pengwei commented on KAFKA-3585: do you used the kafka-server-stop.sh to shutdown? > Shutdown slow when there is only one broker which is controller > --- > > Key: KAFKA-3585 > URL: https://issues.apache.org/jira/browse/KAFKA-3585 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.9.0.1 >Reporter: Pengwei >Assignee: Taiyuan Zhang >Priority: Minor > Fix For: 0.10.0.1 > > > Reproducer Step: > 1. Install 3 brokers's cluster > 2. create a topic with 3 partition > 3. shutdown the broker one by one , you will find the last one shutdown very > slow because of error: > [2016-04-19 20:30:19,168] INFO [Kafka Server 1], Remaining partitions to > move: > __consumer_offsets-48,__consumer_offsets-13,__consumer_offsets-46,__consumer_offsets-11,__consumer_offsets-44,__consumer_offsets-42,__consumer_offsets-21,__consumer_offsets-19,__consumer_offsets-32,__consumer_offsets-30,__consumer_offsets-28,__consumer_offsets-26,__consumer_offsets-7,__consumer_offsets-40,__consumer_offsets-38,__consumer_offsets-36,__consumer_offsets-1,__consumer_offsets-34,__consumer_offsets-16,__consumer_offsets-45,__consumer_offsets-14,__consumer_offsets-12,__consumer_offsets-41,__consumer_offsets-10,__consumer_offsets-24,__consumer_offsets-22,__consumer_offsets-20,__consumer_offsets-49,__consumer_offsets-18,__consumer_offsets-31,__consumer_offsets-0,test2-0,__consumer_offsets-27,__consumer_offsets-39,__consumer_offsets-8,__consumer_offsets-37,__consumer_offsets-6,__consumer_offsets-4,__consumer_offsets-2 > (kafka.server.KafkaServer) > [2016-04-19 20:30:19,169] INFO [Kafka Server 1], Error code from controller: > 0 (kafka.server.KafkaServer) > [2016-04-19 20:30:24,169] WARN [Kafka Server 1], Retrying controlled shutdown > after the previous attempt failed... (kafka.server.KafkaServer) > [2016-04-19 20:30:24,171] WARN [Kafka Server 1], Proceeding to do an unclean > shutdown as all the controlled shutdown attempts failed > (kafka.server.KafkaServer) > it is determined by : > controlled.shutdown.retry.backoff.ms = 5000 > controlled.shutdown.max.retries=3 > It slow because the last one can not elect the new leader for the remaining > partitions , the last one can improve to shutdown quickly, we can skip the > shutdown error when it is the last broker -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3585) Shutdown slow when there is only one broker which is controller
[ https://issues.apache.org/jira/browse/KAFKA-3585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15268171#comment-15268171 ] Pengwei commented on KAFKA-3585: Is the last broker become the controller ? I can reproducer this issue nearly every time > Shutdown slow when there is only one broker which is controller > --- > > Key: KAFKA-3585 > URL: https://issues.apache.org/jira/browse/KAFKA-3585 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.9.0.1 >Reporter: Pengwei >Assignee: Taiyuan Zhang >Priority: Minor > Fix For: 0.10.0.1 > > > Reproducer Step: > 1. Install 3 brokers's cluster > 2. create a topic with 3 partition > 3. shutdown the broker one by one , you will find the last one shutdown very > slow because of error: > [2016-04-19 20:30:19,168] INFO [Kafka Server 1], Remaining partitions to > move: > __consumer_offsets-48,__consumer_offsets-13,__consumer_offsets-46,__consumer_offsets-11,__consumer_offsets-44,__consumer_offsets-42,__consumer_offsets-21,__consumer_offsets-19,__consumer_offsets-32,__consumer_offsets-30,__consumer_offsets-28,__consumer_offsets-26,__consumer_offsets-7,__consumer_offsets-40,__consumer_offsets-38,__consumer_offsets-36,__consumer_offsets-1,__consumer_offsets-34,__consumer_offsets-16,__consumer_offsets-45,__consumer_offsets-14,__consumer_offsets-12,__consumer_offsets-41,__consumer_offsets-10,__consumer_offsets-24,__consumer_offsets-22,__consumer_offsets-20,__consumer_offsets-49,__consumer_offsets-18,__consumer_offsets-31,__consumer_offsets-0,test2-0,__consumer_offsets-27,__consumer_offsets-39,__consumer_offsets-8,__consumer_offsets-37,__consumer_offsets-6,__consumer_offsets-4,__consumer_offsets-2 > (kafka.server.KafkaServer) > [2016-04-19 20:30:19,169] INFO [Kafka Server 1], Error code from controller: > 0 (kafka.server.KafkaServer) > [2016-04-19 20:30:24,169] WARN [Kafka Server 1], Retrying controlled shutdown > after the previous attempt failed... (kafka.server.KafkaServer) > [2016-04-19 20:30:24,171] WARN [Kafka Server 1], Proceeding to do an unclean > shutdown as all the controlled shutdown attempts failed > (kafka.server.KafkaServer) > it is determined by : > controlled.shutdown.retry.backoff.ms = 5000 > controlled.shutdown.max.retries=3 > It slow because the last one can not elect the new leader for the remaining > partitions , the last one can improve to shutdown quickly, we can skip the > shutdown error when it is the last broker -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3585) Shutdown slow when there is only one broker which is controller
Pengwei created KAFKA-3585: -- Summary: Shutdown slow when there is only one broker which is controller Key: KAFKA-3585 URL: https://issues.apache.org/jira/browse/KAFKA-3585 Project: Kafka Issue Type: Improvement Affects Versions: 0.9.0.1 Reporter: Pengwei Priority: Minor Fix For: 0.10.0.1 Reproducer Step: 1. Install 3 brokers's cluster 2. create a topic with 3 partition 3. shutdown the broker one by one , you will find the last one shutdown very slow because of error: [2016-04-19 20:30:19,168] INFO [Kafka Server 1], Remaining partitions to move: __consumer_offsets-48,__consumer_offsets-13,__consumer_offsets-46,__consumer_offsets-11,__consumer_offsets-44,__consumer_offsets-42,__consumer_offsets-21,__consumer_offsets-19,__consumer_offsets-32,__consumer_offsets-30,__consumer_offsets-28,__consumer_offsets-26,__consumer_offsets-7,__consumer_offsets-40,__consumer_offsets-38,__consumer_offsets-36,__consumer_offsets-1,__consumer_offsets-34,__consumer_offsets-16,__consumer_offsets-45,__consumer_offsets-14,__consumer_offsets-12,__consumer_offsets-41,__consumer_offsets-10,__consumer_offsets-24,__consumer_offsets-22,__consumer_offsets-20,__consumer_offsets-49,__consumer_offsets-18,__consumer_offsets-31,__consumer_offsets-0,test2-0,__consumer_offsets-27,__consumer_offsets-39,__consumer_offsets-8,__consumer_offsets-37,__consumer_offsets-6,__consumer_offsets-4,__consumer_offsets-2 (kafka.server.KafkaServer) [2016-04-19 20:30:19,169] INFO [Kafka Server 1], Error code from controller: 0 (kafka.server.KafkaServer) [2016-04-19 20:30:24,169] WARN [Kafka Server 1], Retrying controlled shutdown after the previous attempt failed... (kafka.server.KafkaServer) [2016-04-19 20:30:24,171] WARN [Kafka Server 1], Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed (kafka.server.KafkaServer) it is determined by : controlled.shutdown.retry.backoff.ms = 5000 controlled.shutdown.max.retries=3 It slow because the last one can not elect the new leader for the remaining partitions , the last one can improve to shutdown quickly, we can skip the shutdown error when it is the last broker -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3584) NullPointer Exception found when Delete Topic and Log delete concurrent
Pengwei created KAFKA-3584: -- Summary: NullPointer Exception found when Delete Topic and Log delete concurrent Key: KAFKA-3584 URL: https://issues.apache.org/jira/browse/KAFKA-3584 Project: Kafka Issue Type: Bug Components: core, log Affects Versions: 0.9.0.1 Reporter: Pengwei Assignee: Jay Kreps Priority: Minor Fix For: 0.10.0.1 [2016-03-19 17:23:45,760] ERROR Uncaught exception in scheduled task ‘kafka-log-retention’ (kafka.utils.KafkaScheduler) java.lang.NullPointerException at kafka.log.Log.activeSegment(Log.scala:824) at kafka.log.Log.deleteOldSegments(Log.scala:602) at kafka.log.LogManager.kafka$log$LogManager -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3562) Null Pointer Exception Found when delete topic and Using New Producer
Pengwei created KAFKA-3562: -- Summary: Null Pointer Exception Found when delete topic and Using New Producer Key: KAFKA-3562 URL: https://issues.apache.org/jira/browse/KAFKA-3562 Project: Kafka Issue Type: Bug Affects Versions: 0.9.0.1, 0.9.0.0 Reporter: Pengwei Priority: Minor Fix For: 0.10.0.1 Exception in thread “Thread-2” java.lang.NullPointerException at org.apache.kafka.clients.producer.internals.DefaultPartitioner.partition(DefaultPartitioner.java:70) at org.apache.kafka.clients.producer.KafkaProducer.partition(KafkaProducer.java:687) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:432) at com.huawei.kafka.internal.remove.ProducerMsgThread.run(ProducerMsgThread.java:36) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3075) java.util.HashMap cannot be cast to scala.collection.immutable.Map When using ZookeeperConsumerConnector.commitOffsets
[ https://issues.apache.org/jira/browse/KAFKA-3075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pengwei updated KAFKA-3075: --- Description: When using java api's commit offset : public void commitOffsets(MapoffsetsToCommit, boolean retryOnFailure); and pass a Java Hash Map to this interface, will found: java.lang.ClassCastException: java.util.HashMap cannot be cast to scala.collection.immutable.Map at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118) at kafka.examples.CommitExceptionTest.testCommitNotExistTopicShoudThrowException(CommitExceptionTest.java:55) at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:20) Test case testCommitNotExistTopicShoudThrowException OK. Exception in thread "main" java.lang.ClassCastException: java.util.HashMap cannot be cast to scala.collection.immutable.Map at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118) at kafka.examples.CommitExceptionTest.testCommitOffsetOutOfRange(CommitExceptionTest.java:95) at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:22) The Origin Code: def commitOffsets(offsetsToCommit: java.util.Map[TopicAndPartition, OffsetAndMetadata], retryOnFailure: Boolean) { underlying.commitOffsets(offsetsToCommit.asInstanceOf[immutable.Map[TopicAndPartition, OffsetAndMetadata]], retryOnFailure) } I try to fix like this, it is OK: def commitOffsets(offsetsToCommit: java.util.Map[TopicAndPartition, OffsetAndMetadata], retryOnFailure: Boolean) { import scala.collection.JavaConverters._ underlying.commitOffsets(offsetsToCommit.asScala.toMap, retryOnFailure) } was: When using java api's commit offset : public void commitOffsets(Map offsetsToCommit, boolean retryOnFailure); and pass a Java Hash Map to this interface, will found: java.lang.ClassCastException: java.util.HashMap cannot be cast to scala.collection.immutable.Map at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118) at kafka.examples.CommitExceptionTest.testCommitNotExistTopicShoudThrowException(CommitExceptionTest.java:55) at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:20) Test case testCommitNotExistTopicShoudThrowException OK. Exception in thread "main" java.lang.ClassCastException: java.util.HashMap cannot be cast to scala.collection.immutable.Map at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118) at kafka.examples.CommitExceptionTest.testCommitOffsetOutOfRange(CommitExceptionTest.java:95) at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:22) The Origin Code: def commitOffsets(offsetsToCommit: java.util.Map[TopicAndPartition, OffsetAndMetadata], retryOnFailure: Boolean) { underlying.commitOffsets(offsetsToCommit.asInstanceOf[immutable.Map[TopicAndPartition, OffsetAndMetadata]], retryOnFailure) } I try to fix like this, it is OK: def commitOffsets(offsetsToCommit: java.util.Map[TopicAndPartition, OffsetAndMetadata], retryOnFailure: Boolean) { import scala.collection.JavaConverters._ underlying.commitOffsets(offsetsToCommit.asScala.toMap, retryOnFailure) } > java.util.HashMap cannot be cast to scala.collection.immutable.Map When using > ZookeeperConsumerConnector.commitOffsets > --- > > Key: KAFKA-3075 > URL: https://issues.apache.org/jira/browse/KAFKA-3075 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Pengwei >Assignee: Neha Narkhede > Fix For: 0.9.0.1 > > > When using java api's commit offset : > public void commitOffsets(Map > offsetsToCommit, boolean retryOnFailure); > and pass a Java Hash Map to this interface, will found: > java.lang.ClassCastException: java.util.HashMap cannot be cast to > scala.collection.immutable.Map > at > kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118) > at > kafka.examples.CommitExceptionTest.testCommitNotExistTopicShoudThrowException(CommitExceptionTest.java:55) > at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:20) > Test case testCommitNotExistTopicShoudThrowException OK. > Exception in thread "main" java.lang.ClassCastException: java.util.HashMap > cannot be cast to scala.collection.immutable.Map > at > kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118) > at > kafka.examples.CommitExceptionTest.testCommitOffsetOutOfRange(CommitExceptionTest.java:95) > at
[jira] [Updated] (KAFKA-3075) java.util.HashMap cannot be cast to scala.collection.immutable.Map When using ZookeeperConsumerConnector.commitOffsets
[ https://issues.apache.org/jira/browse/KAFKA-3075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pengwei updated KAFKA-3075: --- Description: When using java api's commit offset : public void commitOffsets(MapoffsetsToCommit, boolean retryOnFailure); and pass a Java Hash Map to this interface, will found: java.lang.ClassCastException: java.util.HashMap cannot be cast to scala.collection.immutable.Map at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118) at kafka.examples.CommitExceptionTest.testCommitNotExistTopicShoudThrowException(CommitExceptionTest.java:55) at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:20) Test case testCommitNotExistTopicShoudThrowException OK. Exception in thread "main" java.lang.ClassCastException: java.util.HashMap cannot be cast to scala.collection.immutable.Map at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118) at kafka.examples.CommitExceptionTest.testCommitOffsetOutOfRange(CommitExceptionTest.java:95) at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:22) The Origin Code: def commitOffsets(offsetsToCommit: java.util.Map[TopicAndPartition, OffsetAndMetadata], retryOnFailure: Boolean) { underlying.commitOffsets(offsetsToCommit.asInstanceOf[immutable.Map[TopicAndPartition, OffsetAndMetadata]], retryOnFailure) } I try to fix like this, it is OK: def commitOffsets(offsetsToCommit: java.util.Map[TopicAndPartition, OffsetAndMetadata], retryOnFailure: Boolean) { import scala.collection.JavaConverters._ underlying.commitOffsets(offsetsToCommit.asScala.toMap, retryOnFailure) } was: When using java api's commit offset : public void commitOffsets(Map offsetsToCommit, boolean retryOnFailure); and pass a Java Hash Map to this interface, will found: java.lang.ClassCastException: java.util.HashMap cannot be cast to scala.collection.immutable.Map at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118) at kafka.examples.CommitExceptionTest.testCommitNotExistTopicShoudThrowException(CommitExceptionTest.java:55) at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:20) Test case testCommitNotExistTopicShoudThrowException OK. Exception in thread "main" java.lang.ClassCastException: java.util.HashMap cannot be cast to scala.collection.immutable.Map at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118) at kafka.examples.CommitExceptionTest.testCommitOffsetOutOfRange(CommitExceptionTest.java:95) at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:22) The Origin Code: def setConsumerRebalanceListener(consumerRebalanceListener: ConsumerRebalanceListener) { underlying.setConsumerRebalanceListener(consumerRebalanceListener) } I try to fix like this, it is OK: def commitOffsets(offsetsToCommit: java.util.Map[TopicAndPartition, OffsetAndMetadata], retryOnFailure: Boolean) { import scala.collection.JavaConverters._ underlying.commitOffsets(offsetsToCommit.asScala.toMap, retryOnFailure) } > java.util.HashMap cannot be cast to scala.collection.immutable.Map When using > ZookeeperConsumerConnector.commitOffsets > --- > > Key: KAFKA-3075 > URL: https://issues.apache.org/jira/browse/KAFKA-3075 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Pengwei >Assignee: Neha Narkhede > Fix For: 0.9.0.1 > > > When using java api's commit offset : > public void commitOffsets(Map > offsetsToCommit, boolean retryOnFailure); > and pass a Java Hash Map to this interface, will found: > java.lang.ClassCastException: java.util.HashMap cannot be cast to > scala.collection.immutable.Map > at > kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118) > at > kafka.examples.CommitExceptionTest.testCommitNotExistTopicShoudThrowException(CommitExceptionTest.java:55) > at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:20) > Test case testCommitNotExistTopicShoudThrowException OK. > Exception in thread "main" java.lang.ClassCastException: java.util.HashMap > cannot be cast to scala.collection.immutable.Map > at > kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118) > at > kafka.examples.CommitExceptionTest.testCommitOffsetOutOfRange(CommitExceptionTest.java:95) > at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:22) > The Origin Code: > def
[jira] [Created] (KAFKA-3075) java.util.HashMap cannot be cast to scala.collection.immutable.Map When using ZookeeperConsumerConnector.commitOffsets
Pengwei created KAFKA-3075: -- Summary: java.util.HashMap cannot be cast to scala.collection.immutable.Map When using ZookeeperConsumerConnector.commitOffsets Key: KAFKA-3075 URL: https://issues.apache.org/jira/browse/KAFKA-3075 Project: Kafka Issue Type: Bug Components: consumer Reporter: Pengwei Assignee: Neha Narkhede Fix For: 0.9.0.1 When using java api's commit offset : public void commitOffsets(MapoffsetsToCommit, boolean retryOnFailure); and pass a Java Hash Map to this interface, will found: java.lang.ClassCastException: java.util.HashMap cannot be cast to scala.collection.immutable.Map at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118) at kafka.examples.CommitExceptionTest.testCommitNotExistTopicShoudThrowException(CommitExceptionTest.java:55) at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:20) Test case testCommitNotExistTopicShoudThrowException OK. Exception in thread "main" java.lang.ClassCastException: java.util.HashMap cannot be cast to scala.collection.immutable.Map at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118) at kafka.examples.CommitExceptionTest.testCommitOffsetOutOfRange(CommitExceptionTest.java:95) at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:22) The Origin Code: def setConsumerRebalanceListener(consumerRebalanceListener: ConsumerRebalanceListener) { underlying.setConsumerRebalanceListener(consumerRebalanceListener) } I try to fix like this, it is OK: def commitOffsets(offsetsToCommit: java.util.Map[TopicAndPartition, OffsetAndMetadata], retryOnFailure: Boolean) { import scala.collection.JavaConverters._ underlying.commitOffsets(offsetsToCommit.asScala.toMap, retryOnFailure) } -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2995) in 0.9.0.0 Old Consumer's commitOffsets with specify partition can submit not exists topic and partition to zk
Pengwei created KAFKA-2995: -- Summary: in 0.9.0.0 Old Consumer's commitOffsets with specify partition can submit not exists topic and partition to zk Key: KAFKA-2995 URL: https://issues.apache.org/jira/browse/KAFKA-2995 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.9.0.0 Reporter: Pengwei Assignee: Neha Narkhede Fix For: 0.9.1.0 in 0.9.0.0 Version, the Old Consumer's commit interface is below: def commitOffsets(offsetsToCommit: immutable.Map[TopicAndPartition, OffsetAndMetadata], isAutoCommit: Boolean) { trace("OffsetMap: %s".format(offsetsToCommit)) var retriesRemaining = 1 + (if (isAutoCommit) 0 else config.offsetsCommitMaxRetries) // no retries for commits from auto-commit var done = false while (!done) { val committed = offsetsChannelLock synchronized { // committed when we receive either no error codes or only MetadataTooLarge errors if (offsetsToCommit.size > 0) { if (config.offsetsStorage == "zookeeper") { offsetsToCommit.foreach { case (topicAndPartition, offsetAndMetadata) => commitOffsetToZooKeeper(topicAndPartition, offsetAndMetadata.offset) } this interface does not check the parameter offsetsToCommit, if offsetsToCommit has some topic or partition which is not exist in the kafka. Then will create an entry in the /consumers/[group]/offsets/[Not exists topic] directory. We should check the offsetsToCommit's topic and partition is exists or just check it is contain in the topicRegistry or checkpointedZkOffsets ? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2903) FileMessageSet's read method maybe has problem when start is not zero
[ https://issues.apache.org/jira/browse/KAFKA-2903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15046561#comment-15046561 ] Pengwei commented on KAFKA-2903: Yes we can also add comment on it. But if in the future version, someone need to use this read API will get error when using a slice FileMessageSet. so modify this code is better? Because it is only one line of code, and add comment will use much word on it > FileMessageSet's read method maybe has problem when start is not zero > - > > Key: KAFKA-2903 > URL: https://issues.apache.org/jira/browse/KAFKA-2903 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.8.2.1, 0.9.0.0 >Reporter: Pengwei >Assignee: Jay Kreps > Fix For: 0.9.1.0 > > > now the code is : > def read(position: Int, size: Int): FileMessageSet = { >. > new FileMessageSet(file, >channel, >start = this.start + position, >end = math.min(this.start + position + size, > sizeInBytes())) > } > if this.start is not 0, the end is only the FileMessageSet's size, not the > actually position of end position. > the end parameter should be: > end = math.min(this.start + position + size, this.start+sizeInBytes()) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2903) FileMessageSet's read method maybe has problem when start is not zero
[ https://issues.apache.org/jira/browse/KAFKA-2903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pengwei updated KAFKA-2903: --- Affects Version/s: 0.9.0.0 0.8.2.1 Fix Version/s: 0.9.0.0 > FileMessageSet's read method maybe has problem when start is not zero > - > > Key: KAFKA-2903 > URL: https://issues.apache.org/jira/browse/KAFKA-2903 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.8.2.1, 0.9.0.0 >Reporter: Pengwei >Assignee: Jay Kreps > Fix For: 0.9.0.0 > > > now the code is : > def read(position: Int, size: Int): FileMessageSet = { >. > new FileMessageSet(file, >channel, >start = this.start + position, >end = math.min(this.start + position + size, > sizeInBytes())) > } > if this.start is not 0, the end is only the FileMessageSet's size, not the > actually position of end position. > the end parameter should be: > end = math.min(this.start + position + size, this.start+sizeInBytes()) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2903) FileMessageSet's read method maybe has problem when start is not zero
Pengwei created KAFKA-2903: -- Summary: FileMessageSet's read method maybe has problem when start is not zero Key: KAFKA-2903 URL: https://issues.apache.org/jira/browse/KAFKA-2903 Project: Kafka Issue Type: Bug Components: log Reporter: Pengwei Assignee: Jay Kreps now the code is : def read(position: Int, size: Int): FileMessageSet = { . new FileMessageSet(file, channel, start = this.start + position, end = math.min(this.start + position + size, sizeInBytes())) } if this.start is not 0, the end is only the FileMessageSet's size, not the actually position of end position. the end parameter should be: end = math.min(this.start + position + size, this.start+sizeInBytes()) -- This message was sent by Atlassian JIRA (v6.3.4#6332)