[jira] [Created] (KAFKA-7411) Change system to future and change back will make replication not working

2018-09-14 Thread Pengwei (JIRA)
Pengwei created KAFKA-7411:
--

 Summary: Change system to future and change back will make 
replication not working
 Key: KAFKA-7411
 URL: https://issues.apache.org/jira/browse/KAFKA-7411
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 2.0.0, 1.1.1
Reporter: Pengwei


When we change one of the follower's system time to future for some time, then 
change the system time back, we will find the replication not working.

 

this is because the replication thread need to determine:

buildFetchRequest -> partitionFetchState.isReadyForFetch 

and the DelayedItem's dueMs time is future, but after change back the system 
time,

in the DelayedItem's 's getDelay function will have a large time:

def getDelay(unit: TimeUnit): Long = {
 unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), TimeUnit.MILLISECONDS)
 }

due to dueMs is future time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-5752) Delete topic and re-create topic immediate will delete the new topic's timeindex

2017-08-19 Thread Pengwei (JIRA)
Pengwei created KAFKA-5752:
--

 Summary: Delete topic and re-create topic immediate will delete 
the new topic's timeindex 
 Key: KAFKA-5752
 URL: https://issues.apache.org/jira/browse/KAFKA-5752
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.0, 0.10.2.0
Reporter: Pengwei


When we delete the topic and re-create the topic with the same name, we will 
find after the 
async delete topic is finished,  async delete will remove the newly created 
topic's time index.


This is because in the LogManager's asyncDelete, it will change the log and 
index's file pointer to the renamed log and index, but missing the time index. 
So will cause this issue



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5553) Delete topic failed to change from OnlineReplica to ReplicaDeletionStarted if ISR not created

2017-07-04 Thread Pengwei (JIRA)
Pengwei created KAFKA-5553:
--

 Summary: Delete topic failed to change from OnlineReplica to 
ReplicaDeletionStarted  if ISR not created
 Key: KAFKA-5553
 URL: https://issues.apache.org/jira/browse/KAFKA-5553
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 0.11.0.0, 0.10.2.0, 0.9.0.1
Reporter: Pengwei


We found the error log as follow and the topic can not remove for a long time:

[2016-07-11 20:17:52,965] ERROR Controller 1328 epoch 315 initiated state 
change of replica 1328 for partition [websocket_test_topic,0] from 
OnlineReplica to ReplicaDeletionStarted failed (state.change.logger)
java.lang.AssertionError: assertion failed: Replica 
[Topic=websocket_test_topic,Partition=0,Replica=1328] should be in the 
OfflineReplica states before moving to ReplicaDeletionStarted state. Instead it 
is in OnlineReplica state
at scala.Predef$.assert(Predef.scala:165)
at 
kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:309)
at 
kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:190)
at 
kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
at 
kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
at 
scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:322)
at 
kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114)
at 
kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:344)
at 
kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:334)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
at 
kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:334)
at 
kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:367)
at 
kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:313)
at 
kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:312)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
at 
kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:312)
at 
kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:431)
at 
kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:403)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
at 
kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:403)
at 
kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
at 
kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at 
kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:397)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5425) Kafka replication support all log segment files sync with leader

2017-06-09 Thread Pengwei (JIRA)
Pengwei created KAFKA-5425:
--

 Summary: Kafka replication support all log segment files sync with 
leader
 Key: KAFKA-5425
 URL: https://issues.apache.org/jira/browse/KAFKA-5425
 Project: Kafka
  Issue Type: Improvement
Reporter: Pengwei
Priority: Minor


Currently kafka replication only support follower sync the leader with the 
latest  fetch offset, it means offsets smaller than this fetch offset's 
messages are already sync.  After complete sync with the leader, it will update 
the fetch offset to log end offset.

But if the log segments which latest offset is smaller than current fetch 
offset are being delete, the replication will not sync these log segments back 
to kafka.   In this case, there is a risk of losing messages if the follow is 
becoming a leader,  consumer begin to consume from begin to end will found some 
messages are lost.

Can we  improve the kafka replication mechanism to ensure follow''s all log 
segments are the same as the leader? Even if some log segments are delete,  
will try to recover from the leader



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4862) Kafka client connect to a shutdown node will block for a long time

2017-04-17 Thread Pengwei (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pengwei updated KAFKA-4862:
---
Status: Patch Available  (was: Open)

> Kafka client connect to a shutdown node will block for a long time
> --
>
> Key: KAFKA-4862
> URL: https://issues.apache.org/jira/browse/KAFKA-4862
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0, 0.9.0.0
>Reporter: Pengwei
>Assignee: Pengwei
> Fix For: 0.11.0.0
>
>
> Currently in our test env, we found after one of the broker node crash(reboot 
> or os crash), the client maybe still connecting to the crash node to send 
> metadata request or other request, and it need about several  minutes to 
> aware the connection is timeout then try another node to connect to send the 
> request.  Then the client may still not aware the metadata change after 
> several minutes.
> We don't have a connection timeout for the network client, we should add a 
> connection timeout for the client



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5014) SSL Channel not ready but tcp is established and the server is hung will not sending metadata

2017-04-05 Thread Pengwei (JIRA)
Pengwei created KAFKA-5014:
--

 Summary: SSL Channel not ready but tcp is established and the 
server is hung will not sending metadata
 Key: KAFKA-5014
 URL: https://issues.apache.org/jira/browse/KAFKA-5014
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.2.0, 0.9.0.1
Reporter: Pengwei
Priority: Minor


In our test env, QA hang one of the connecting broker of the producer, then the 
producer will be stuck in send method, and throw the exception: fail to update 
metadata after request timeout.

I found the reason as follow:  when the producer chose one of the broker to 
send metadata, it connect to the broker, but the broker is hang, the tcp is 
connected and Network client marks this broker is connected, but the SSL 
channel is not ready yet so the channel is not ready.

Then the Network client chooses the connected node in the leastLoadedNode every 
time to send the metadata, but the node's channel is not ready yet.  

So the producer stuck in getting metadata and will not try another node to 
request metadata.  The client should not stuck only one node is hung



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-4790) Kafka cannot recover after a disk full

2017-03-10 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904333#comment-15904333
 ] 

Pengwei edited comment on KAFKA-4790 at 3/10/17 10:54 AM:
--

[~junrao] If KIP-98 can fix this issue, we can wait for the KIP-98.There is 
a method to avoid this issue, we should make sure the 
log.segment.bytes / log.index.interval.bytes <=  log.index.size.max.bytes / 8
8 bytes is every index item size

In our case, we should set log.segment.bytes to 512MB instead of 1G, after set 
log.index.size.max.bytes to 1MB


was (Author: pengwei):
[~junrao] If KIP-98 can fix this issue, we can wait for the KIP-98.There is 
a method to avoid this issue, we should make sure the 
log.segment.bytes / log.index.interval.bytes >= log.index.size.max.bytes / 8
8 bytes is every index item size

In our case, we should set log.segment.bytes to 512MB instead of 1G, after set 
log.index.size.max.bytes to 1MB

> Kafka cannot recover after a disk full
> --
>
> Key: KAFKA-4790
> URL: https://issues.apache.org/jira/browse/KAFKA-4790
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1, 0.10.1.1
>Reporter: Pengwei
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> [2017-02-23 18:43:57,736] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2017-02-23 18:43:57,887] INFO Loading logs. (kafka.log.LogManager)
> [2017-02-23 18:43:57,935] INFO Recovering unflushed segment 0 in log test1-0. 
> (kafka.log.Log)
> [2017-02-23 18:43:59,297] ERROR There was an error in one of the threads 
> during logs loading: java.lang.IllegalArgumentException: requirement failed: 
> Attempt to append to a full index (size = 128000). (kafka.log.LogManager)
> [2017-02-23 18:43:59,299] FATAL Fatal error during KafkaServer startup. 
> Prepare to shutdown (kafka.server.KafkaServer)
> java.lang.IllegalArgumentException: requirement failed: Attempt to append to 
> a full index (size = 128000).
>   at scala.Predef$.require(Predef.scala:219)
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:200)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:199)
>   at kafka.log.LogSegment.recover(LogSegment.scala:191)
>   at kafka.log.Log.recoverLog(Log.scala:259)
>   at kafka.log.Log.loadSegments(Log.scala:234)
>   at kafka.log.Log.(Log.scala:92)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$4$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:201)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4790) Kafka cannot recover after a disk full

2017-03-09 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904333#comment-15904333
 ] 

Pengwei commented on KAFKA-4790:


[~junrao] If KIP-98 can fix this issue, we can wait for the KIP-98.There is 
a method to avoid this issue, we should make sure the 
log.segment.bytes / log.index.interval.bytes >= log.index.size.max.bytes / 8
8 bytes is every index item size

In our case, we should set log.segment.bytes to 512MB instead of 1G, after set 
log.index.size.max.bytes to 1MB

> Kafka cannot recover after a disk full
> --
>
> Key: KAFKA-4790
> URL: https://issues.apache.org/jira/browse/KAFKA-4790
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1, 0.10.1.1
>Reporter: Pengwei
>  Labels: reliability
>
> [2017-02-23 18:43:57,736] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2017-02-23 18:43:57,887] INFO Loading logs. (kafka.log.LogManager)
> [2017-02-23 18:43:57,935] INFO Recovering unflushed segment 0 in log test1-0. 
> (kafka.log.Log)
> [2017-02-23 18:43:59,297] ERROR There was an error in one of the threads 
> during logs loading: java.lang.IllegalArgumentException: requirement failed: 
> Attempt to append to a full index (size = 128000). (kafka.log.LogManager)
> [2017-02-23 18:43:59,299] FATAL Fatal error during KafkaServer startup. 
> Prepare to shutdown (kafka.server.KafkaServer)
> java.lang.IllegalArgumentException: requirement failed: Attempt to append to 
> a full index (size = 128000).
>   at scala.Predef$.require(Predef.scala:219)
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:200)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:199)
>   at kafka.log.LogSegment.recover(LogSegment.scala:191)
>   at kafka.log.Log.recoverLog(Log.scala:259)
>   at kafka.log.Log.loadSegments(Log.scala:234)
>   at kafka.log.Log.(Log.scala:92)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$4$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:201)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4862) Kafka client connect to a shutdown node will block for a long time

2017-03-07 Thread Pengwei (JIRA)
Pengwei created KAFKA-4862:
--

 Summary: Kafka client connect to a shutdown node will block for a 
long time
 Key: KAFKA-4862
 URL: https://issues.apache.org/jira/browse/KAFKA-4862
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.2.0, 0.9.0.0
Reporter: Pengwei
Assignee: Pengwei
 Fix For: 0.11.0.0


Currently in our test env, we found after one of the broker node crash(reboot 
or os crash), the client maybe still connecting to the crash node to send 
metadata request or other request, and it need about several  minutes to aware 
the connection is timeout then try another node to connect to send the request. 
 Then the client may still not aware the metadata change after several minutes.

We don't have a connection timeout for the network client, we should add a 
connection timeout for the client



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4790) Kafka can't not recover after a disk full

2017-02-23 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880567#comment-15880567
 ] 

Pengwei commented on KAFKA-4790:


The index max file set to 1MB
log.index.size.max.bytes=1024000


The reason I found is below:

1. Producer batch a lot of message into kafka(for example 512k) , so every 
write will have more than 4k(index write interval ),  for example write 2050 
index entry.  


2. At the same time disk is full, the kafka is dead before recover point is 
flush into disk

3. Restart the kafka,  the recover function, will check every msgs to re-append 
the index item into the index file, then every 4k message will write a index 
entry, then the total index entry will exceed 2050 or maybe exceed the index 
full's max entries

> Kafka can't not recover after a disk full
> -
>
> Key: KAFKA-4790
> URL: https://issues.apache.org/jira/browse/KAFKA-4790
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1, 0.10.1.1
>Reporter: Pengwei
>  Labels: reliability
> Fix For: 0.10.2.1
>
>
> [2017-02-23 18:43:57,736] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2017-02-23 18:43:57,887] INFO Loading logs. (kafka.log.LogManager)
> [2017-02-23 18:43:57,935] INFO Recovering unflushed segment 0 in log test1-0. 
> (kafka.log.Log)
> [2017-02-23 18:43:59,297] ERROR There was an error in one of the threads 
> during logs loading: java.lang.IllegalArgumentException: requirement failed: 
> Attempt to append to a full index (size = 128000). (kafka.log.LogManager)
> [2017-02-23 18:43:59,299] FATAL Fatal error during KafkaServer startup. 
> Prepare to shutdown (kafka.server.KafkaServer)
> java.lang.IllegalArgumentException: requirement failed: Attempt to append to 
> a full index (size = 128000).
>   at scala.Predef$.require(Predef.scala:219)
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:200)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:199)
>   at kafka.log.LogSegment.recover(LogSegment.scala:191)
>   at kafka.log.Log.recoverLog(Log.scala:259)
>   at kafka.log.Log.loadSegments(Log.scala:234)
>   at kafka.log.Log.(Log.scala:92)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$4$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:201)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4790) Kafka can't not recover after a disk full

2017-02-23 Thread Pengwei (JIRA)
Pengwei created KAFKA-4790:
--

 Summary: Kafka can't not recover after a disk full
 Key: KAFKA-4790
 URL: https://issues.apache.org/jira/browse/KAFKA-4790
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.1.1, 0.9.0.1
Reporter: Pengwei
 Fix For: 0.10.2.1


[2017-02-23 18:43:57,736] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
[2017-02-23 18:43:57,887] INFO Loading logs. (kafka.log.LogManager)
[2017-02-23 18:43:57,935] INFO Recovering unflushed segment 0 in log test1-0. 
(kafka.log.Log)
[2017-02-23 18:43:59,297] ERROR There was an error in one of the threads during 
logs loading: java.lang.IllegalArgumentException: requirement failed: Attempt 
to append to a full index (size = 128000). (kafka.log.LogManager)
[2017-02-23 18:43:59,299] FATAL Fatal error during KafkaServer startup. Prepare 
to shutdown (kafka.server.KafkaServer)
java.lang.IllegalArgumentException: requirement failed: Attempt to append to a 
full index (size = 128000).
at scala.Predef$.require(Predef.scala:219)
at 
kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:200)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.append(OffsetIndex.scala:199)
at kafka.log.LogSegment.recover(LogSegment.scala:191)
at kafka.log.Log.recoverLog(Log.scala:259)
at kafka.log.Log.loadSegments(Log.scala:234)
at kafka.log.Log.(Log.scala:92)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$4$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:201)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2017-01-10 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15817467#comment-15817467
 ] 

Pengwei edited comment on KAFKA-3042 at 1/11/17 7:27 AM:
-

[~lindong]  It seems I still encounter this issue after apply your patch in our 
test enviroment
The log is also logging the message "Cached zkVersion " and cannot recover

The reproducer step we used is to stop three broker's network some time and 
restart the network, then we found one of the brokers keep printing the cached 
zkversion log


was (Author: pengwei):
[~lindong]  It seems I still encounter this issue after apply your patch in our 
test enviroment
The log is also logging the message "Cached zkVersion " and cannot recover

> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
>Assignee: Dong Lin
>  Labels: reliability
> Fix For: 0.10.2.0
>
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Reopened] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2017-01-10 Thread Pengwei (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pengwei reopened KAFKA-3042:


> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
>Assignee: Dong Lin
>  Labels: reliability
> Fix For: 0.10.2.0
>
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2017-01-10 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15817467#comment-15817467
 ] 

Pengwei commented on KAFKA-3042:


[~lindong]  It seems I still encounter this issue after apply your patch in our 
test enviroment
The log is also logging the message "Cached zkVersion " and cannot recover

> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
>Assignee: Dong Lin
>  Labels: reliability
> Fix For: 0.10.2.0
>
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers

2017-01-09 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15811007#comment-15811007
 ] 

Pengwei commented on KAFKA-4595:


One method is to add a lock timeout method instead of blocking locking in the 
deleteTopicStopReplicaCallback。 If timeout, this call is failed and the 
RequestSendThread can be stop after this round. And it seems like to be the 
broker have receive the stop replicas command, but does not response the result 
to the controller,  if the  broker recover, it can receive the stop replicas 
command again and response to the controller.

> Controller send thread can't stop when broker change listener event trigger 
> for  dead brokers
> -
>
> Key: KAFKA-4595
> URL: https://issues.apache.org/jira/browse/KAFKA-4595
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0, 0.10.1.1
>Reporter: Pengwei
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.2.0
>
>
> In our test env, we found controller is not working after a delete topic 
> opertation and network issue, the stack is below:
> "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" 
> #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on 
> condition [0x7fb76b7c8000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc05497b8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50)
>   at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32)
>   at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128)
>   at 
> kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81)
>   - locked <0xc0258760> (a java.lang.Object)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
>   at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>Locked ownable synchronizers:
>   - <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 
> tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de]
>java.lang.Thread.State: WAITING (parking)
> 

[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers

2017-01-05 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15803816#comment-15803816
 ] 

Pengwei commented on KAFKA-4595:


[~huxi_2b]

> Controller send thread can't stop when broker change listener event trigger 
> for  dead brokers
> -
>
> Key: KAFKA-4595
> URL: https://issues.apache.org/jira/browse/KAFKA-4595
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0, 0.10.1.1
>Reporter: Pengwei
> Fix For: 0.10.2.0
>
>
> In our test env, we found controller is not working after a delete topic 
> opertation and network issue, the stack is below:
> "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" 
> #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on 
> condition [0x7fb76b7c8000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc05497b8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50)
>   at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32)
>   at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128)
>   at 
> kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81)
>   - locked <0xc0258760> (a java.lang.Object)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
>   at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>Locked ownable synchronizers:
>   - <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 
> tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
>   at 
> 

[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers

2017-01-05 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15803812#comment-15803812
 ] 

Pengwei commented on KAFKA-4595:


Yes, you are right.

> Controller send thread can't stop when broker change listener event trigger 
> for  dead brokers
> -
>
> Key: KAFKA-4595
> URL: https://issues.apache.org/jira/browse/KAFKA-4595
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0, 0.10.1.1
>Reporter: Pengwei
> Fix For: 0.10.2.0
>
>
> In our test env, we found controller is not working after a delete topic 
> opertation and network issue, the stack is below:
> "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" 
> #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on 
> condition [0x7fb76b7c8000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc05497b8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50)
>   at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32)
>   at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128)
>   at 
> kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81)
>   - locked <0xc0258760> (a java.lang.Object)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
>   at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>Locked ownable synchronizers:
>   - <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 
> tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
>

[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers

2017-01-05 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15801343#comment-15801343
 ] 

Pengwei commented on KAFKA-4595:


The broker change event is trigger to remove dead brokers 1003,  it lock the 
controller lock and  need to stop the 
Controller-1001-to-broker-1003-send-thread, but this send thread is calling the 
deleteTopicStopReplicaCallback, it also try to get the controller lock, so dead 
lock between these two operation.

> Controller send thread can't stop when broker change listener event trigger 
> for  dead brokers
> -
>
> Key: KAFKA-4595
> URL: https://issues.apache.org/jira/browse/KAFKA-4595
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0, 0.10.1.1
>Reporter: Pengwei
> Fix For: 0.10.2.0
>
>
> In our test env, we found controller is not working after a delete topic 
> opertation and network issue, the stack is below:
> "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" 
> #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on 
> condition [0x7fb76b7c8000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc05497b8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50)
>   at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32)
>   at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128)
>   at 
> kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81)
>   - locked <0xc0258760> (a java.lang.Object)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
>   at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>Locked ownable synchronizers:
>   - <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 
> tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>   at 

[jira] [Created] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers

2017-01-05 Thread Pengwei (JIRA)
Pengwei created KAFKA-4595:
--

 Summary: Controller send thread can't stop when broker change 
listener event trigger for  dead brokers
 Key: KAFKA-4595
 URL: https://issues.apache.org/jira/browse/KAFKA-4595
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.1.1, 0.9.0.0
Reporter: Pengwei
 Fix For: 0.10.2.0


In our test env, we found controller is not working after a delete topic 
opertation and network issue, the stack is below:


"ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" 
#15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on 
condition [0x7fb76b7c8000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xc05497b8> (a 
java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at 
kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50)
at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32)
at 
kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128)
at 
kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81)
- locked <0xc0258760> (a java.lang.Object)
at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369)
at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369)
at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

   Locked ownable synchronizers:
- <0xc02587f8> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)



"Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 
tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xc02587f8> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
at 

[jira] [Updated] (KAFKA-4229) Controller can't start after several zk expired event

2016-11-26 Thread Pengwei (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pengwei updated KAFKA-4229:
---
Status: Patch Available  (was: In Progress)

PR : https://github.com/apache/kafka/pull/2175

> Controller can't start after several zk expired event
> -
>
> Key: KAFKA-4229
> URL: https://issues.apache.org/jira/browse/KAFKA-4229
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.10.0.1, 0.10.0.0, 0.9.0.1, 0.9.0.0
>Reporter: Pengwei
>Assignee: Pengwei
>
> We found the controller not started after several zk expired event in our 
> test environment.  By analysing the log, I found the controller will handle 
> the ephemeral node data delete event first and then the zk expired event , 
> then the controller will gone.
> I can reproducer it on my develop env:
> 1. set up a one broker and one zk env, specify a very large zk timeout (20s)
> 2. stop the broker and remove the zk's /broker/ids/0  directory
> 3. restart the broker and make  a breakpoint in the zk client's event thread 
> to queue the delete event.
> 4. after the /controller node gone the breakpoint will hit.
> 5. expired the current session(suspend the send thread) and create a new 
> session s2
> 6. resume the event thread, then the controller will handle 
> LeaderChangeListener.handleDataDeleted  and become leader
> 7. then controller will handle SessionExpirationListener.handleNewSession, it 
> resign the controller and elect,  but when elect it found the /controller 
> node is exist and not become the leader.  But the /controller node is created 
> by current session s2 will not remove. So the controller is gone



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-4229) Controller can't start after several zk expired event

2016-11-26 Thread Pengwei (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4229 started by Pengwei.
--
> Controller can't start after several zk expired event
> -
>
> Key: KAFKA-4229
> URL: https://issues.apache.org/jira/browse/KAFKA-4229
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Pengwei
>Assignee: Pengwei
>
> We found the controller not started after several zk expired event in our 
> test environment.  By analysing the log, I found the controller will handle 
> the ephemeral node data delete event first and then the zk expired event , 
> then the controller will gone.
> I can reproducer it on my develop env:
> 1. set up a one broker and one zk env, specify a very large zk timeout (20s)
> 2. stop the broker and remove the zk's /broker/ids/0  directory
> 3. restart the broker and make  a breakpoint in the zk client's event thread 
> to queue the delete event.
> 4. after the /controller node gone the breakpoint will hit.
> 5. expired the current session(suspend the send thread) and create a new 
> session s2
> 6. resume the event thread, then the controller will handle 
> LeaderChangeListener.handleDataDeleted  and become leader
> 7. then controller will handle SessionExpirationListener.handleNewSession, it 
> resign the controller and elect,  but when elect it found the /controller 
> node is exist and not become the leader.  But the /controller node is created 
> by current session s2 will not remove. So the controller is gone



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4229) Controller can't start after several zk expired event

2016-11-26 Thread Pengwei (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pengwei updated KAFKA-4229:
---
Reviewer: Guozhang Wang

> Controller can't start after several zk expired event
> -
>
> Key: KAFKA-4229
> URL: https://issues.apache.org/jira/browse/KAFKA-4229
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Pengwei
>Assignee: Pengwei
>
> We found the controller not started after several zk expired event in our 
> test environment.  By analysing the log, I found the controller will handle 
> the ephemeral node data delete event first and then the zk expired event , 
> then the controller will gone.
> I can reproducer it on my develop env:
> 1. set up a one broker and one zk env, specify a very large zk timeout (20s)
> 2. stop the broker and remove the zk's /broker/ids/0  directory
> 3. restart the broker and make  a breakpoint in the zk client's event thread 
> to queue the delete event.
> 4. after the /controller node gone the breakpoint will hit.
> 5. expired the current session(suspend the send thread) and create a new 
> session s2
> 6. resume the event thread, then the controller will handle 
> LeaderChangeListener.handleDataDeleted  and become leader
> 7. then controller will handle SessionExpirationListener.handleNewSession, it 
> resign the controller and elect,  but when elect it found the /controller 
> node is exist and not become the leader.  But the /controller node is created 
> by current session s2 will not remove. So the controller is gone



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4229) Controller can't start after several zk expired event

2016-11-26 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15698992#comment-15698992
 ] 

Pengwei commented on KAFKA-4229:


The PR is : https://github.com/apache/kafka/pull/2175

> Controller can't start after several zk expired event
> -
>
> Key: KAFKA-4229
> URL: https://issues.apache.org/jira/browse/KAFKA-4229
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Pengwei
>Assignee: Pengwei
>
> We found the controller not started after several zk expired event in our 
> test environment.  By analysing the log, I found the controller will handle 
> the ephemeral node data delete event first and then the zk expired event , 
> then the controller will gone.
> I can reproducer it on my develop env:
> 1. set up a one broker and one zk env, specify a very large zk timeout (20s)
> 2. stop the broker and remove the zk's /broker/ids/0  directory
> 3. restart the broker and make  a breakpoint in the zk client's event thread 
> to queue the delete event.
> 4. after the /controller node gone the breakpoint will hit.
> 5. expired the current session(suspend the send thread) and create a new 
> session s2
> 6. resume the event thread, then the controller will handle 
> LeaderChangeListener.handleDataDeleted  and become leader
> 7. then controller will handle SessionExpirationListener.handleNewSession, it 
> resign the controller and elect,  but when elect it found the /controller 
> node is exist and not become the leader.  But the /controller node is created 
> by current session s2 will not remove. So the controller is gone



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4229) Controller can't start after several zk expired event

2016-10-13 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571265#comment-15571265
 ] 

Pengwei commented on KAFKA-4229:


We test it on 0.9.0.0, but I found the controller code are nearly the same 
between these versions.
In 0.9.0.0, zk version is 3.4.6

> Controller can't start after several zk expired event
> -
>
> Key: KAFKA-4229
> URL: https://issues.apache.org/jira/browse/KAFKA-4229
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Pengwei
>Assignee: Pengwei
>
> We found the controller not started after several zk expired event in our 
> test environment.  By analysing the log, I found the controller will handle 
> the ephemeral node data delete event first and then the zk expired event , 
> then the controller will gone.
> I can reproducer it on my develop env:
> 1. set up a one broker and one zk env, specify a very large zk timeout (20s)
> 2. stop the broker and remove the zk's /broker/ids/0  directory
> 3. restart the broker and make  a breakpoint in the zk client's event thread 
> to queue the delete event.
> 4. after the /controller node gone the breakpoint will hit.
> 5. expired the current session(suspend the send thread) and create a new 
> session s2
> 6. resume the event thread, then the controller will handle 
> LeaderChangeListener.handleDataDeleted  and become leader
> 7. then controller will handle SessionExpirationListener.handleNewSession, it 
> resign the controller and elect,  but when elect it found the /controller 
> node is exist and not become the leader.  But the /controller node is created 
> by current session s2 will not remove. So the controller is gone



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4229) Controller can't start after serveral zk expired event

2016-09-29 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15532241#comment-15532241
 ] 

Pengwei commented on KAFKA-4229:


I already have a patch for this issue, maybe the issue can assign to me?

> Controller can't start after serveral zk expired event
> --
>
> Key: KAFKA-4229
> URL: https://issues.apache.org/jira/browse/KAFKA-4229
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Pengwei
>
> We found the controller not started after serveral zk expired event in our 
> test environment.  By analysing the log, I found the controller will handle 
> the ephemeral node data delete event first and then the zk expired event , 
> then the controller will gone.
> I can reproducer it on my develop env:
> 1. set up a one broker and one zk env, specify a very large zk timeout (20s)
> 2. stop the broker and remove the zk's /broker/ids/0  directory
> 3. restart the broker and make  a breakpoint in the zk client's event thread 
> to queue the delete event.
> 4. after the /controller node gone the breakpoint will hit.
> 5. expired the current session(suspend the send thread) and create a new 
> session s2
> 6. resume the event thread, then the controller will handle 
> LeaderChangeListener.handleDataDeleted  and become leader
> 7. then controller will handle SessionExpirationListener.handleNewSession, it 
> resign the controller and elect,  but when elect it found the /controller 
> node is exist and not become the leader.  But the /controller node is created 
> by current session s2 will not remove. So the controller is gone



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4229) Controller can't start after serveral zk expired event

2016-09-29 Thread Pengwei (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pengwei updated KAFKA-4229:
---
Fix Version/s: 0.10.1.0

> Controller can't start after serveral zk expired event
> --
>
> Key: KAFKA-4229
> URL: https://issues.apache.org/jira/browse/KAFKA-4229
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Pengwei
>
> We found the controller not started after serveral zk expired event in our 
> test environment.  By analysing the log, I found the controller will handle 
> the ephemeral node data delete event first and then the zk expired event , 
> then the controller will gone.
> I can reproducer it on my develop env:
> 1. set up a one broker and one zk env, specify a very large zk timeout (20s)
> 2. stop the broker and remove the zk's /broker/ids/0  directory
> 3. restart the broker and make  a breakpoint in the zk client's event thread 
> to queue the delete event.
> 4. after the /controller node gone the breakpoint will hit.
> 5. expired the current session(suspend the send thread) and create a new 
> session s2
> 6. resume the event thread, then the controller will handle 
> LeaderChangeListener.handleDataDeleted  and become leader
> 7. then controller will handle SessionExpirationListener.handleNewSession, it 
> resign the controller and elect,  but when elect it found the /controller 
> node is exist and not become the leader.  But the /controller node is created 
> by current session s2 will not remove. So the controller is gone



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4229) Controller can't start after serveral zk expired event

2016-09-29 Thread Pengwei (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pengwei updated KAFKA-4229:
---
Fix Version/s: (was: 0.10.1.0)

> Controller can't start after serveral zk expired event
> --
>
> Key: KAFKA-4229
> URL: https://issues.apache.org/jira/browse/KAFKA-4229
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Pengwei
>
> We found the controller not started after serveral zk expired event in our 
> test environment.  By analysing the log, I found the controller will handle 
> the ephemeral node data delete event first and then the zk expired event , 
> then the controller will gone.
> I can reproducer it on my develop env:
> 1. set up a one broker and one zk env, specify a very large zk timeout (20s)
> 2. stop the broker and remove the zk's /broker/ids/0  directory
> 3. restart the broker and make  a breakpoint in the zk client's event thread 
> to queue the delete event.
> 4. after the /controller node gone the breakpoint will hit.
> 5. expired the current session(suspend the send thread) and create a new 
> session s2
> 6. resume the event thread, then the controller will handle 
> LeaderChangeListener.handleDataDeleted  and become leader
> 7. then controller will handle SessionExpirationListener.handleNewSession, it 
> resign the controller and elect,  but when elect it found the /controller 
> node is exist and not become the leader.  But the /controller node is created 
> by current session s2 will not remove. So the controller is gone



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4229) Controller can't start after serveral zk expired event

2016-09-29 Thread Pengwei (JIRA)
Pengwei created KAFKA-4229:
--

 Summary: Controller can't start after serveral zk expired event
 Key: KAFKA-4229
 URL: https://issues.apache.org/jira/browse/KAFKA-4229
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 0.10.0.1, 0.10.0.0, 0.9.0.1, 0.9.0.0
Reporter: Pengwei


We found the controller not started after serveral zk expired event in our test 
environment.  By analysing the log, I found the controller will handle the 
ephemeral node data delete event first and then the zk expired event , then the 
controller will gone.
I can reproducer it on my develop env:
1. set up a one broker and one zk env, specify a very large zk timeout (20s)
2. stop the broker and remove the zk's /broker/ids/0  directory
3. restart the broker and make  a breakpoint in the zk client's event thread to 
queue the delete event.
4. after the /controller node gone the breakpoint will hit.
5. expired the current session(suspend the send thread) and create a new 
session s2
6. resume the event thread, then the controller will handle 
LeaderChangeListener.handleDataDeleted  and become leader
7. then controller will handle SessionExpirationListener.handleNewSession, it 
resign the controller and elect,  but when elect it found the /controller node 
is exist and not become the leader.  But the /controller node is created by 
current session s2 will not remove. So the controller is gone



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1429) Yet another deadlock in controller shutdown

2016-07-23 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15390887#comment-15390887
 ] 

Pengwei commented on KAFKA-1429:


hi Jun, I have open a PR for this issue. Can you review it? thanks~

> Yet another deadlock in controller shutdown
> ---
>
> Key: KAFKA-1429
> URL: https://issues.apache.org/jira/browse/KAFKA-1429
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.1
>Reporter: Dmitry Bugaychenko
>Assignee: Neha Narkhede
> Attachments: kafka_0.9.0.0_controller_dead_lock.patch
>
>
> Found one more case of deadlock in controller during shutdown:
> {code}
> ZkClient-EventThread-57-192.168.41.148:2181,192.168.36.250:2181,192.168.41.207:2181
>  id=57 state=TIMED_WAITING
> - waiting on <0x288a66ec> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> - locked <0x288a66ec> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at sun.misc.Unsafe.park(Native Method)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> at 
> java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1468)
> at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:88)
> at 
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:339)
> at 
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
> at 
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
> at kafka.utils.Utils$.inLock(Utils.scala:538)
> at 
> kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:337)
> at 
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1068)
> at 
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)
> at 
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)
> at kafka.utils.Utils$.inLock(Utils.scala:538)
> at 
> kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1067)
> at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> Locked synchronizers: count = 1
>   - java.util.concurrent.locks.ReentrantLock$NonfairSync@22b9b31a
> kafka-scheduler-0 id=172 state=WAITING
> - waiting on <0x22b9b31a> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> - locked <0x22b9b31a> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>  owned by 
> ZkClient-EventThread-57-192.168.41.148:2181,192.168.36.250:2181,192.168.41.207:2181
>  id=57
> at sun.misc.Unsafe.park(Native Method)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214)
> at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290)
> at kafka.utils.Utils$.inLock(Utils.scala:536)
> at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1110)
> at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1108)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1108)
> at 
> 

[jira] [Created] (KAFKA-3913) Old consumer's metrics error when using IPv6

2016-06-28 Thread Pengwei (JIRA)
Pengwei created KAFKA-3913:
--

 Summary: Old consumer's metrics error when using IPv6 
 Key: KAFKA-3913
 URL: https://issues.apache.org/jira/browse/KAFKA-3913
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.10.0.0, 0.9.0.1
Reporter: Pengwei
Priority: Minor
 Fix For: 0.10.1.0


The error is below:

[2016-05-09 15:49:20,096] WARN Property security.protocol is not valid 
(kafka.utils.VerifiableProperties)
[2016-05-09 15:49:20,882] WARN Error processing 
kafka.server:type=FetcherStats,name=RequestsPerSec,clientId=console-consumer-32775,brokerHost=fe80::92e2:baff:fe07:51cc,brokerPort=9093
 (com.yammer.metrics.reporting.JmxReporter)
javax.management.MalformedObjectNameException: Invalid character ':' in value 
part of property
at javax.management.ObjectName.construct(ObjectName.java:618)
at javax.management.ObjectName.(ObjectName.java:1382)
at com.yammer.metrics.reporting.JmxReporter.onMetricAdded(JmxReporter.java:395)
at 
com.yammer.metrics.core.MetricsRegistry.notifyMetricAdded(MetricsRegistry.java:516)
at com.yammer.metrics.core.MetricsRegistry.getOrAdd(MetricsRegistry.java:491)
at com.yammer.metrics.core.MetricsRegistry.newMeter(MetricsRegistry.java:240)
at kafka.metrics.KafkaMetricsGroup$class.newMeter(KafkaMetricsGroup.scala:79)
at kafka.server.FetcherStats.newMeter(AbstractFetcherThread.scala:264)
at kafka.server.FetcherStats.(AbstractFetcherThread.scala:269)
at kafka.server.AbstractFetcherThread.(AbstractFetcherThread.scala:55)
at kafka.consumer.ConsumerFetcherThread.(ConsumerFetcherThread.scala:38)
at 
kafka.consumer.ConsumerFetcherManager.createFetcherThread(ConsumerFetcherManager.scala:118)
at 
kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:83)
at 
kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:78)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
at 
kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:78)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1429) Yet another deadlock in controller shutdown

2016-06-17 Thread Pengwei (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pengwei updated KAFKA-1429:
---
Attachment: kafka_0.9.0.0_controller_dead_lock.patch

upload file is the patch to fix this bug, can somebody review it ?

> Yet another deadlock in controller shutdown
> ---
>
> Key: KAFKA-1429
> URL: https://issues.apache.org/jira/browse/KAFKA-1429
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.1
>Reporter: Dmitry Bugaychenko
>Assignee: Neha Narkhede
> Attachments: kafka_0.9.0.0_controller_dead_lock.patch
>
>
> Found one more case of deadlock in controller during shutdown:
> {code}
> ZkClient-EventThread-57-192.168.41.148:2181,192.168.36.250:2181,192.168.41.207:2181
>  id=57 state=TIMED_WAITING
> - waiting on <0x288a66ec> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> - locked <0x288a66ec> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at sun.misc.Unsafe.park(Native Method)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> at 
> java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1468)
> at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:88)
> at 
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:339)
> at 
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
> at 
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
> at kafka.utils.Utils$.inLock(Utils.scala:538)
> at 
> kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:337)
> at 
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1068)
> at 
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)
> at 
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)
> at kafka.utils.Utils$.inLock(Utils.scala:538)
> at 
> kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1067)
> at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> Locked synchronizers: count = 1
>   - java.util.concurrent.locks.ReentrantLock$NonfairSync@22b9b31a
> kafka-scheduler-0 id=172 state=WAITING
> - waiting on <0x22b9b31a> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> - locked <0x22b9b31a> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>  owned by 
> ZkClient-EventThread-57-192.168.41.148:2181,192.168.36.250:2181,192.168.41.207:2181
>  id=57
> at sun.misc.Unsafe.park(Native Method)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214)
> at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290)
> at kafka.utils.Utils$.inLock(Utils.scala:536)
> at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1110)
> at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1108)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at 
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1108)
> at 
> 

[jira] [Commented] (KAFKA-1429) Yet another deadlock in controller shutdown

2016-06-15 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15331523#comment-15331523
 ] 

Pengwei commented on KAFKA-1429:


I can still reproduce this issue in 0.9.0.0 or 0.9.0.1,  Controller session 
expire and holding the controllerLock lock and try to wait the 
autoRebalanceScheduler thread to shutdown,  but 
the autoRebalanceScheduler is running for the parttion rebalance and this 
thread want the controllerLock : 

"ZkClient-EventThread-18-9.94.1.21:2181,9.94.1.22:2181,9.94.1.23:2181/oms-cluster-1"
 #18 daemon prio=5 os_prio=0 tid=0x00c3f800 nid=0x49be waiting on 
condition [0x7f466603d000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xd9eaace0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at 
java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465)
at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:98)
at 
kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:370)
at 
kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1171)
at 
kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1170)
at 
kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1170)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at 
kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1170)
at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:734)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)


"kafka-scheduler-12" #143 daemon prio=5 os_prio=0 tid=0x7f465c4cc000 
nid=0x566f waiting on condition [0x7f466260e000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xc0a9a480> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:260)
at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1198)
at 
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1194)
at scala.collection.immutable.Map$Map3.foreach(Map.scala:161)
at 
kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1194)
at 
kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:344)
at 
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

> Yet another deadlock in controller shutdown
> ---
>
> Key: KAFKA-1429
> URL: https://issues.apache.org/jira/browse/KAFKA-1429
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.1
>Reporter: Dmitry Bugaychenko
>Assignee: Neha Narkhede
>
> Found one 

[jira] [Commented] (KAFKA-3585) Shutdown slow when there is only one broker which is controller

2016-05-08 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15275879#comment-15275879
 ] 

Pengwei commented on KAFKA-3585:


do you used the kafka-server-stop.sh to shutdown?

> Shutdown slow when there is only one broker which is controller
> ---
>
> Key: KAFKA-3585
> URL: https://issues.apache.org/jira/browse/KAFKA-3585
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.1
>Reporter: Pengwei
>Assignee: Taiyuan Zhang
>Priority: Minor
> Fix For: 0.10.0.1
>
>
> Reproducer Step:
> 1. Install 3 brokers's cluster
> 2. create a topic with 3 partition
> 3. shutdown the broker one by one , you will find the last one shutdown very 
> slow because of error:
> [2016-04-19 20:30:19,168] INFO [Kafka Server 1], Remaining partitions to 
> move: 
> __consumer_offsets-48,__consumer_offsets-13,__consumer_offsets-46,__consumer_offsets-11,__consumer_offsets-44,__consumer_offsets-42,__consumer_offsets-21,__consumer_offsets-19,__consumer_offsets-32,__consumer_offsets-30,__consumer_offsets-28,__consumer_offsets-26,__consumer_offsets-7,__consumer_offsets-40,__consumer_offsets-38,__consumer_offsets-36,__consumer_offsets-1,__consumer_offsets-34,__consumer_offsets-16,__consumer_offsets-45,__consumer_offsets-14,__consumer_offsets-12,__consumer_offsets-41,__consumer_offsets-10,__consumer_offsets-24,__consumer_offsets-22,__consumer_offsets-20,__consumer_offsets-49,__consumer_offsets-18,__consumer_offsets-31,__consumer_offsets-0,test2-0,__consumer_offsets-27,__consumer_offsets-39,__consumer_offsets-8,__consumer_offsets-37,__consumer_offsets-6,__consumer_offsets-4,__consumer_offsets-2
>  (kafka.server.KafkaServer)
> [2016-04-19 20:30:19,169] INFO [Kafka Server 1], Error code from controller: 
> 0 (kafka.server.KafkaServer)
> [2016-04-19 20:30:24,169] WARN [Kafka Server 1], Retrying controlled shutdown 
> after the previous attempt failed... (kafka.server.KafkaServer)
> [2016-04-19 20:30:24,171] WARN [Kafka Server 1], Proceeding to do an unclean 
> shutdown as all the controlled shutdown attempts failed 
> (kafka.server.KafkaServer)
> it is determined by :
> controlled.shutdown.retry.backoff.ms  = 5000
> controlled.shutdown.max.retries=3
> It slow because the last one can not elect the new leader for the remaining 
> partitions , the last one can improve to shutdown quickly, we can skip the 
> shutdown error when it is the last broker



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (KAFKA-3585) Shutdown slow when there is only one broker which is controller

2016-05-08 Thread Pengwei (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pengwei updated KAFKA-3585:
---
Comment: was deleted

(was: do you used the kafka-server-stop.sh to shutdown?)

> Shutdown slow when there is only one broker which is controller
> ---
>
> Key: KAFKA-3585
> URL: https://issues.apache.org/jira/browse/KAFKA-3585
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.1
>Reporter: Pengwei
>Assignee: Taiyuan Zhang
>Priority: Minor
> Fix For: 0.10.0.1
>
>
> Reproducer Step:
> 1. Install 3 brokers's cluster
> 2. create a topic with 3 partition
> 3. shutdown the broker one by one , you will find the last one shutdown very 
> slow because of error:
> [2016-04-19 20:30:19,168] INFO [Kafka Server 1], Remaining partitions to 
> move: 
> __consumer_offsets-48,__consumer_offsets-13,__consumer_offsets-46,__consumer_offsets-11,__consumer_offsets-44,__consumer_offsets-42,__consumer_offsets-21,__consumer_offsets-19,__consumer_offsets-32,__consumer_offsets-30,__consumer_offsets-28,__consumer_offsets-26,__consumer_offsets-7,__consumer_offsets-40,__consumer_offsets-38,__consumer_offsets-36,__consumer_offsets-1,__consumer_offsets-34,__consumer_offsets-16,__consumer_offsets-45,__consumer_offsets-14,__consumer_offsets-12,__consumer_offsets-41,__consumer_offsets-10,__consumer_offsets-24,__consumer_offsets-22,__consumer_offsets-20,__consumer_offsets-49,__consumer_offsets-18,__consumer_offsets-31,__consumer_offsets-0,test2-0,__consumer_offsets-27,__consumer_offsets-39,__consumer_offsets-8,__consumer_offsets-37,__consumer_offsets-6,__consumer_offsets-4,__consumer_offsets-2
>  (kafka.server.KafkaServer)
> [2016-04-19 20:30:19,169] INFO [Kafka Server 1], Error code from controller: 
> 0 (kafka.server.KafkaServer)
> [2016-04-19 20:30:24,169] WARN [Kafka Server 1], Retrying controlled shutdown 
> after the previous attempt failed... (kafka.server.KafkaServer)
> [2016-04-19 20:30:24,171] WARN [Kafka Server 1], Proceeding to do an unclean 
> shutdown as all the controlled shutdown attempts failed 
> (kafka.server.KafkaServer)
> it is determined by :
> controlled.shutdown.retry.backoff.ms  = 5000
> controlled.shutdown.max.retries=3
> It slow because the last one can not elect the new leader for the remaining 
> partitions , the last one can improve to shutdown quickly, we can skip the 
> shutdown error when it is the last broker



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3585) Shutdown slow when there is only one broker which is controller

2016-05-08 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15275880#comment-15275880
 ] 

Pengwei commented on KAFKA-3585:


do you used the kafka-server-stop.sh to shutdown?

> Shutdown slow when there is only one broker which is controller
> ---
>
> Key: KAFKA-3585
> URL: https://issues.apache.org/jira/browse/KAFKA-3585
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.1
>Reporter: Pengwei
>Assignee: Taiyuan Zhang
>Priority: Minor
> Fix For: 0.10.0.1
>
>
> Reproducer Step:
> 1. Install 3 brokers's cluster
> 2. create a topic with 3 partition
> 3. shutdown the broker one by one , you will find the last one shutdown very 
> slow because of error:
> [2016-04-19 20:30:19,168] INFO [Kafka Server 1], Remaining partitions to 
> move: 
> __consumer_offsets-48,__consumer_offsets-13,__consumer_offsets-46,__consumer_offsets-11,__consumer_offsets-44,__consumer_offsets-42,__consumer_offsets-21,__consumer_offsets-19,__consumer_offsets-32,__consumer_offsets-30,__consumer_offsets-28,__consumer_offsets-26,__consumer_offsets-7,__consumer_offsets-40,__consumer_offsets-38,__consumer_offsets-36,__consumer_offsets-1,__consumer_offsets-34,__consumer_offsets-16,__consumer_offsets-45,__consumer_offsets-14,__consumer_offsets-12,__consumer_offsets-41,__consumer_offsets-10,__consumer_offsets-24,__consumer_offsets-22,__consumer_offsets-20,__consumer_offsets-49,__consumer_offsets-18,__consumer_offsets-31,__consumer_offsets-0,test2-0,__consumer_offsets-27,__consumer_offsets-39,__consumer_offsets-8,__consumer_offsets-37,__consumer_offsets-6,__consumer_offsets-4,__consumer_offsets-2
>  (kafka.server.KafkaServer)
> [2016-04-19 20:30:19,169] INFO [Kafka Server 1], Error code from controller: 
> 0 (kafka.server.KafkaServer)
> [2016-04-19 20:30:24,169] WARN [Kafka Server 1], Retrying controlled shutdown 
> after the previous attempt failed... (kafka.server.KafkaServer)
> [2016-04-19 20:30:24,171] WARN [Kafka Server 1], Proceeding to do an unclean 
> shutdown as all the controlled shutdown attempts failed 
> (kafka.server.KafkaServer)
> it is determined by :
> controlled.shutdown.retry.backoff.ms  = 5000
> controlled.shutdown.max.retries=3
> It slow because the last one can not elect the new leader for the remaining 
> partitions , the last one can improve to shutdown quickly, we can skip the 
> shutdown error when it is the last broker



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3585) Shutdown slow when there is only one broker which is controller

2016-05-03 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15268171#comment-15268171
 ] 

Pengwei commented on KAFKA-3585:


Is the last broker become the controller ?   
I can reproducer this issue nearly every time

> Shutdown slow when there is only one broker which is controller
> ---
>
> Key: KAFKA-3585
> URL: https://issues.apache.org/jira/browse/KAFKA-3585
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.1
>Reporter: Pengwei
>Assignee: Taiyuan Zhang
>Priority: Minor
> Fix For: 0.10.0.1
>
>
> Reproducer Step:
> 1. Install 3 brokers's cluster
> 2. create a topic with 3 partition
> 3. shutdown the broker one by one , you will find the last one shutdown very 
> slow because of error:
> [2016-04-19 20:30:19,168] INFO [Kafka Server 1], Remaining partitions to 
> move: 
> __consumer_offsets-48,__consumer_offsets-13,__consumer_offsets-46,__consumer_offsets-11,__consumer_offsets-44,__consumer_offsets-42,__consumer_offsets-21,__consumer_offsets-19,__consumer_offsets-32,__consumer_offsets-30,__consumer_offsets-28,__consumer_offsets-26,__consumer_offsets-7,__consumer_offsets-40,__consumer_offsets-38,__consumer_offsets-36,__consumer_offsets-1,__consumer_offsets-34,__consumer_offsets-16,__consumer_offsets-45,__consumer_offsets-14,__consumer_offsets-12,__consumer_offsets-41,__consumer_offsets-10,__consumer_offsets-24,__consumer_offsets-22,__consumer_offsets-20,__consumer_offsets-49,__consumer_offsets-18,__consumer_offsets-31,__consumer_offsets-0,test2-0,__consumer_offsets-27,__consumer_offsets-39,__consumer_offsets-8,__consumer_offsets-37,__consumer_offsets-6,__consumer_offsets-4,__consumer_offsets-2
>  (kafka.server.KafkaServer)
> [2016-04-19 20:30:19,169] INFO [Kafka Server 1], Error code from controller: 
> 0 (kafka.server.KafkaServer)
> [2016-04-19 20:30:24,169] WARN [Kafka Server 1], Retrying controlled shutdown 
> after the previous attempt failed... (kafka.server.KafkaServer)
> [2016-04-19 20:30:24,171] WARN [Kafka Server 1], Proceeding to do an unclean 
> shutdown as all the controlled shutdown attempts failed 
> (kafka.server.KafkaServer)
> it is determined by :
> controlled.shutdown.retry.backoff.ms  = 5000
> controlled.shutdown.max.retries=3
> It slow because the last one can not elect the new leader for the remaining 
> partitions , the last one can improve to shutdown quickly, we can skip the 
> shutdown error when it is the last broker



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3585) Shutdown slow when there is only one broker which is controller

2016-04-19 Thread Pengwei (JIRA)
Pengwei created KAFKA-3585:
--

 Summary: Shutdown slow when there is only one broker which is 
controller
 Key: KAFKA-3585
 URL: https://issues.apache.org/jira/browse/KAFKA-3585
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.9.0.1
Reporter: Pengwei
Priority: Minor
 Fix For: 0.10.0.1


Reproducer Step:
1. Install 3 brokers's cluster

2. create a topic with 3 partition

3. shutdown the broker one by one , you will find the last one shutdown very 
slow because of error:
[2016-04-19 20:30:19,168] INFO [Kafka Server 1], Remaining partitions to move: 
__consumer_offsets-48,__consumer_offsets-13,__consumer_offsets-46,__consumer_offsets-11,__consumer_offsets-44,__consumer_offsets-42,__consumer_offsets-21,__consumer_offsets-19,__consumer_offsets-32,__consumer_offsets-30,__consumer_offsets-28,__consumer_offsets-26,__consumer_offsets-7,__consumer_offsets-40,__consumer_offsets-38,__consumer_offsets-36,__consumer_offsets-1,__consumer_offsets-34,__consumer_offsets-16,__consumer_offsets-45,__consumer_offsets-14,__consumer_offsets-12,__consumer_offsets-41,__consumer_offsets-10,__consumer_offsets-24,__consumer_offsets-22,__consumer_offsets-20,__consumer_offsets-49,__consumer_offsets-18,__consumer_offsets-31,__consumer_offsets-0,test2-0,__consumer_offsets-27,__consumer_offsets-39,__consumer_offsets-8,__consumer_offsets-37,__consumer_offsets-6,__consumer_offsets-4,__consumer_offsets-2
 (kafka.server.KafkaServer)
[2016-04-19 20:30:19,169] INFO [Kafka Server 1], Error code from controller: 0 
(kafka.server.KafkaServer)
[2016-04-19 20:30:24,169] WARN [Kafka Server 1], Retrying controlled shutdown 
after the previous attempt failed... (kafka.server.KafkaServer)
[2016-04-19 20:30:24,171] WARN [Kafka Server 1], Proceeding to do an unclean 
shutdown as all the controlled shutdown attempts failed 
(kafka.server.KafkaServer)

it is determined by :
controlled.shutdown.retry.backoff.ms  = 5000
controlled.shutdown.max.retries=3


It slow because the last one can not elect the new leader for the remaining 
partitions , the last one can improve to shutdown quickly, we can skip the 
shutdown error when it is the last broker



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3584) NullPointer Exception found when Delete Topic and Log delete concurrent

2016-04-19 Thread Pengwei (JIRA)
Pengwei created KAFKA-3584:
--

 Summary: NullPointer Exception found when Delete Topic and Log 
delete concurrent
 Key: KAFKA-3584
 URL: https://issues.apache.org/jira/browse/KAFKA-3584
 Project: Kafka
  Issue Type: Bug
  Components: core, log
Affects Versions: 0.9.0.1
Reporter: Pengwei
Assignee: Jay Kreps
Priority: Minor
 Fix For: 0.10.0.1


[2016-03-19 17:23:45,760] ERROR Uncaught exception in scheduled task 
‘kafka-log-retention’ (kafka.utils.KafkaScheduler)
java.lang.NullPointerException
at kafka.log.Log.activeSegment(Log.scala:824)
at kafka.log.Log.deleteOldSegments(Log.scala:602)
at kafka.log.LogManager.kafka$log$LogManager



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3562) Null Pointer Exception Found when delete topic and Using New Producer

2016-04-15 Thread Pengwei (JIRA)
Pengwei created KAFKA-3562:
--

 Summary: Null Pointer Exception Found when delete topic and Using 
New Producer
 Key: KAFKA-3562
 URL: https://issues.apache.org/jira/browse/KAFKA-3562
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.9.0.1, 0.9.0.0
Reporter: Pengwei
Priority: Minor
 Fix For: 0.10.0.1


Exception in thread “Thread-2” java.lang.NullPointerException
at 
org.apache.kafka.clients.producer.internals.DefaultPartitioner.partition(DefaultPartitioner.java:70)
at 
org.apache.kafka.clients.producer.KafkaProducer.partition(KafkaProducer.java:687)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:432)
at 
com.huawei.kafka.internal.remove.ProducerMsgThread.run(ProducerMsgThread.java:36)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3075) java.util.HashMap cannot be cast to scala.collection.immutable.Map When using ZookeeperConsumerConnector.commitOffsets

2016-01-06 Thread Pengwei (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pengwei updated KAFKA-3075:
---
Description: 
When using java api's commit offset :

 public void commitOffsets(Map 
offsetsToCommit, boolean retryOnFailure);

and pass a  Java Hash Map to this interface, will found:

java.lang.ClassCastException: java.util.HashMap cannot be cast to 
scala.collection.immutable.Map
at 
kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118)
at 
kafka.examples.CommitExceptionTest.testCommitNotExistTopicShoudThrowException(CommitExceptionTest.java:55)
at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:20)
Test case testCommitNotExistTopicShoudThrowException OK.
Exception in thread "main" java.lang.ClassCastException: java.util.HashMap 
cannot be cast to scala.collection.immutable.Map
at 
kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118)
at 
kafka.examples.CommitExceptionTest.testCommitOffsetOutOfRange(CommitExceptionTest.java:95)
at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:22)

The Origin Code:
 def commitOffsets(offsetsToCommit: java.util.Map[TopicAndPartition, 
OffsetAndMetadata], retryOnFailure: Boolean) {

underlying.commitOffsets(offsetsToCommit.asInstanceOf[immutable.Map[TopicAndPartition,
 OffsetAndMetadata]], retryOnFailure)
  }
I try to fix like this, it is OK:
  def commitOffsets(offsetsToCommit: java.util.Map[TopicAndPartition, 
OffsetAndMetadata], retryOnFailure: Boolean) {
import scala.collection.JavaConverters._

underlying.commitOffsets(offsetsToCommit.asScala.toMap, retryOnFailure)
  }


  was:
When using java api's commit offset :

 public void commitOffsets(Map 
offsetsToCommit, boolean retryOnFailure);

and pass a  Java Hash Map to this interface, will found:

java.lang.ClassCastException: java.util.HashMap cannot be cast to 
scala.collection.immutable.Map
at 
kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118)
at 
kafka.examples.CommitExceptionTest.testCommitNotExistTopicShoudThrowException(CommitExceptionTest.java:55)
at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:20)
Test case testCommitNotExistTopicShoudThrowException OK.
Exception in thread "main" java.lang.ClassCastException: java.util.HashMap 
cannot be cast to scala.collection.immutable.Map
at 
kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118)
at 
kafka.examples.CommitExceptionTest.testCommitOffsetOutOfRange(CommitExceptionTest.java:95)
at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:22)

The Origin Code:
 def commitOffsets(offsetsToCommit: java.util.Map[TopicAndPartition, 
OffsetAndMetadata], retryOnFailure: Boolean) {

underlying.commitOffsets(offsetsToCommit.asInstanceOf[immutable.Map[TopicAndPartition,
 OffsetAndMetadata]], retryOnFailure)
  }
I try to fix like this, it is OK:
  def commitOffsets(offsetsToCommit: java.util.Map[TopicAndPartition, 
OffsetAndMetadata], retryOnFailure: Boolean) {
import scala.collection.JavaConverters._
underlying.commitOffsets(offsetsToCommit.asScala.toMap, retryOnFailure)
  }



> java.util.HashMap cannot be cast to scala.collection.immutable.Map When using 
>  ZookeeperConsumerConnector.commitOffsets
> ---
>
> Key: KAFKA-3075
> URL: https://issues.apache.org/jira/browse/KAFKA-3075
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Pengwei
>Assignee: Neha Narkhede
> Fix For: 0.9.0.1
>
>
> When using java api's commit offset :
>  public void commitOffsets(Map 
> offsetsToCommit, boolean retryOnFailure);
> and pass a  Java Hash Map to this interface, will found:
> java.lang.ClassCastException: java.util.HashMap cannot be cast to 
> scala.collection.immutable.Map
> at 
> kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118)
> at 
> kafka.examples.CommitExceptionTest.testCommitNotExistTopicShoudThrowException(CommitExceptionTest.java:55)
> at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:20)
> Test case testCommitNotExistTopicShoudThrowException OK.
> Exception in thread "main" java.lang.ClassCastException: java.util.HashMap 
> cannot be cast to scala.collection.immutable.Map
> at 
> kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118)
> at 
> kafka.examples.CommitExceptionTest.testCommitOffsetOutOfRange(CommitExceptionTest.java:95)
> at 

[jira] [Updated] (KAFKA-3075) java.util.HashMap cannot be cast to scala.collection.immutable.Map When using ZookeeperConsumerConnector.commitOffsets

2016-01-06 Thread Pengwei (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pengwei updated KAFKA-3075:
---
Description: 
When using java api's commit offset :

 public void commitOffsets(Map 
offsetsToCommit, boolean retryOnFailure);

and pass a  Java Hash Map to this interface, will found:

java.lang.ClassCastException: java.util.HashMap cannot be cast to 
scala.collection.immutable.Map
at 
kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118)
at 
kafka.examples.CommitExceptionTest.testCommitNotExistTopicShoudThrowException(CommitExceptionTest.java:55)
at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:20)
Test case testCommitNotExistTopicShoudThrowException OK.
Exception in thread "main" java.lang.ClassCastException: java.util.HashMap 
cannot be cast to scala.collection.immutable.Map
at 
kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118)
at 
kafka.examples.CommitExceptionTest.testCommitOffsetOutOfRange(CommitExceptionTest.java:95)
at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:22)

The Origin Code:
 def commitOffsets(offsetsToCommit: java.util.Map[TopicAndPartition, 
OffsetAndMetadata], retryOnFailure: Boolean) {

underlying.commitOffsets(offsetsToCommit.asInstanceOf[immutable.Map[TopicAndPartition,
 OffsetAndMetadata]], retryOnFailure)
  }
I try to fix like this, it is OK:
  def commitOffsets(offsetsToCommit: java.util.Map[TopicAndPartition, 
OffsetAndMetadata], retryOnFailure: Boolean) {
import scala.collection.JavaConverters._
underlying.commitOffsets(offsetsToCommit.asScala.toMap, retryOnFailure)
  }


  was:
When using java api's commit offset :

 public void commitOffsets(Map 
offsetsToCommit, boolean retryOnFailure);

and pass a  Java Hash Map to this interface, will found:

java.lang.ClassCastException: java.util.HashMap cannot be cast to 
scala.collection.immutable.Map
at 
kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118)
at 
kafka.examples.CommitExceptionTest.testCommitNotExistTopicShoudThrowException(CommitExceptionTest.java:55)
at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:20)
Test case testCommitNotExistTopicShoudThrowException OK.
Exception in thread "main" java.lang.ClassCastException: java.util.HashMap 
cannot be cast to scala.collection.immutable.Map
at 
kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118)
at 
kafka.examples.CommitExceptionTest.testCommitOffsetOutOfRange(CommitExceptionTest.java:95)
at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:22)

The Origin Code:
def setConsumerRebalanceListener(consumerRebalanceListener: 
ConsumerRebalanceListener) {
underlying.setConsumerRebalanceListener(consumerRebalanceListener)
  }

I try to fix like this, it is OK:
  def commitOffsets(offsetsToCommit: java.util.Map[TopicAndPartition, 
OffsetAndMetadata], retryOnFailure: Boolean) {
import scala.collection.JavaConverters._
underlying.commitOffsets(offsetsToCommit.asScala.toMap, retryOnFailure)
  }



> java.util.HashMap cannot be cast to scala.collection.immutable.Map When using 
>  ZookeeperConsumerConnector.commitOffsets
> ---
>
> Key: KAFKA-3075
> URL: https://issues.apache.org/jira/browse/KAFKA-3075
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Pengwei
>Assignee: Neha Narkhede
> Fix For: 0.9.0.1
>
>
> When using java api's commit offset :
>  public void commitOffsets(Map 
> offsetsToCommit, boolean retryOnFailure);
> and pass a  Java Hash Map to this interface, will found:
> java.lang.ClassCastException: java.util.HashMap cannot be cast to 
> scala.collection.immutable.Map
> at 
> kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118)
> at 
> kafka.examples.CommitExceptionTest.testCommitNotExistTopicShoudThrowException(CommitExceptionTest.java:55)
> at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:20)
> Test case testCommitNotExistTopicShoudThrowException OK.
> Exception in thread "main" java.lang.ClassCastException: java.util.HashMap 
> cannot be cast to scala.collection.immutable.Map
> at 
> kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118)
> at 
> kafka.examples.CommitExceptionTest.testCommitOffsetOutOfRange(CommitExceptionTest.java:95)
> at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:22)
> The Origin Code:
>  def 

[jira] [Created] (KAFKA-3075) java.util.HashMap cannot be cast to scala.collection.immutable.Map When using ZookeeperConsumerConnector.commitOffsets

2016-01-06 Thread Pengwei (JIRA)
Pengwei created KAFKA-3075:
--

 Summary: java.util.HashMap cannot be cast to 
scala.collection.immutable.Map When using  
ZookeeperConsumerConnector.commitOffsets
 Key: KAFKA-3075
 URL: https://issues.apache.org/jira/browse/KAFKA-3075
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Pengwei
Assignee: Neha Narkhede
 Fix For: 0.9.0.1


When using java api's commit offset :

 public void commitOffsets(Map 
offsetsToCommit, boolean retryOnFailure);

and pass a  Java Hash Map to this interface, will found:

java.lang.ClassCastException: java.util.HashMap cannot be cast to 
scala.collection.immutable.Map
at 
kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118)
at 
kafka.examples.CommitExceptionTest.testCommitNotExistTopicShoudThrowException(CommitExceptionTest.java:55)
at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:20)
Test case testCommitNotExistTopicShoudThrowException OK.
Exception in thread "main" java.lang.ClassCastException: java.util.HashMap 
cannot be cast to scala.collection.immutable.Map
at 
kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:118)
at 
kafka.examples.CommitExceptionTest.testCommitOffsetOutOfRange(CommitExceptionTest.java:95)
at kafka.examples.CommitExceptionTest.main(CommitExceptionTest.java:22)

The Origin Code:
def setConsumerRebalanceListener(consumerRebalanceListener: 
ConsumerRebalanceListener) {
underlying.setConsumerRebalanceListener(consumerRebalanceListener)
  }

I try to fix like this, it is OK:
  def commitOffsets(offsetsToCommit: java.util.Map[TopicAndPartition, 
OffsetAndMetadata], retryOnFailure: Boolean) {
import scala.collection.JavaConverters._
underlying.commitOffsets(offsetsToCommit.asScala.toMap, retryOnFailure)
  }




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2995) in 0.9.0.0 Old Consumer's commitOffsets with specify partition can submit not exists topic and partition to zk

2015-12-16 Thread Pengwei (JIRA)
Pengwei created KAFKA-2995:
--

 Summary: in 0.9.0.0 Old Consumer's commitOffsets with specify 
partition can submit not exists topic and partition to zk
 Key: KAFKA-2995
 URL: https://issues.apache.org/jira/browse/KAFKA-2995
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.9.0.0
Reporter: Pengwei
Assignee: Neha Narkhede
 Fix For: 0.9.1.0


in 0.9.0.0 Version, the Old Consumer's commit interface is below:

def commitOffsets(offsetsToCommit: immutable.Map[TopicAndPartition, 
OffsetAndMetadata], isAutoCommit: Boolean) {
trace("OffsetMap: %s".format(offsetsToCommit))
var retriesRemaining = 1 + (if (isAutoCommit) 0 else 
config.offsetsCommitMaxRetries) // no retries for commits from auto-commit
var done = false
while (!done) {
  val committed = offsetsChannelLock synchronized {
// committed when we receive either no error codes or only 
MetadataTooLarge errors
if (offsetsToCommit.size > 0) {
  if (config.offsetsStorage == "zookeeper") {
offsetsToCommit.foreach { case (topicAndPartition, 
offsetAndMetadata) =>
  commitOffsetToZooKeeper(topicAndPartition, 
offsetAndMetadata.offset)
}
  

this interface does not check the parameter offsetsToCommit, if offsetsToCommit 
has some topic or partition which is not exist in the kafka. Then will create 
an entry in the  /consumers/[group]/offsets/[Not exists topic]   directory.

We should check the offsetsToCommit's topic and partition is exists or just 
check it is contain in the topicRegistry or checkpointedZkOffsets ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2903) FileMessageSet's read method maybe has problem when start is not zero

2015-12-08 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15046561#comment-15046561
 ] 

Pengwei commented on KAFKA-2903:


Yes we can also add comment on it. 
But if in the future version, someone need to use this read API will get error 
when using a slice  FileMessageSet.  so modify this code is better?
Because it is only one line of code, and add comment will use much word on it

> FileMessageSet's read method maybe has problem when start is not zero
> -
>
> Key: KAFKA-2903
> URL: https://issues.apache.org/jira/browse/KAFKA-2903
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.2.1, 0.9.0.0
>Reporter: Pengwei
>Assignee: Jay Kreps
> Fix For: 0.9.1.0
>
>
> now the code is :
> def read(position: Int, size: Int): FileMessageSet = {
>. 
> new FileMessageSet(file,
>channel,
>start = this.start + position,
>end = math.min(this.start + position + size, 
> sizeInBytes()))
>   }
> if this.start is not 0, the end is only the FileMessageSet's size, not the 
> actually position of end position.
> the end parameter should be:
>  end = math.min(this.start + position + size, this.start+sizeInBytes())



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2903) FileMessageSet's read method maybe has problem when start is not zero

2015-11-27 Thread Pengwei (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pengwei updated KAFKA-2903:
---
Affects Version/s: 0.9.0.0
   0.8.2.1
Fix Version/s: 0.9.0.0

> FileMessageSet's read method maybe has problem when start is not zero
> -
>
> Key: KAFKA-2903
> URL: https://issues.apache.org/jira/browse/KAFKA-2903
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.2.1, 0.9.0.0
>Reporter: Pengwei
>Assignee: Jay Kreps
> Fix For: 0.9.0.0
>
>
> now the code is :
> def read(position: Int, size: Int): FileMessageSet = {
>. 
> new FileMessageSet(file,
>channel,
>start = this.start + position,
>end = math.min(this.start + position + size, 
> sizeInBytes()))
>   }
> if this.start is not 0, the end is only the FileMessageSet's size, not the 
> actually position of end position.
> the end parameter should be:
>  end = math.min(this.start + position + size, this.start+sizeInBytes())



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2903) FileMessageSet's read method maybe has problem when start is not zero

2015-11-27 Thread Pengwei (JIRA)
Pengwei created KAFKA-2903:
--

 Summary: FileMessageSet's read method maybe has problem when start 
is not zero
 Key: KAFKA-2903
 URL: https://issues.apache.org/jira/browse/KAFKA-2903
 Project: Kafka
  Issue Type: Bug
  Components: log
Reporter: Pengwei
Assignee: Jay Kreps


now the code is :
def read(position: Int, size: Int): FileMessageSet = {
   . 
new FileMessageSet(file,
   channel,
   start = this.start + position,
   end = math.min(this.start + position + size, 
sizeInBytes()))
  }

if this.start is not 0, the end is only the FileMessageSet's size, not the 
actually position of end position.
the end parameter should be:
 end = math.min(this.start + position + size, this.start+sizeInBytes())





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)