[jira] [Commented] (KAFKA-6537) Duplicate consumers after consumer group rebalance

2018-02-07 Thread Michael Golovanov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356606#comment-16356606
 ] 

Michael Golovanov commented on KAFKA-6537:
--

Seems this issue duplicate of 
[KAFKA-5430|https://issues.apache.org/jira/browse/KAFKA-5430]

> Duplicate consumers after consumer group rebalance
> --
>
> Key: KAFKA-6537
> URL: https://issues.apache.org/jira/browse/KAFKA-6537
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Michael Golovanov
>Priority: Critical
>
> *_Deployment description_*
> Kafka brokers have been deployed on ten (10) nodes. Zookeeper cluster have 
> seven (7) nodes. Nodes of Kafka brokers shared with Zookeeper nodes on bare 
> metal hosts.
> Broker/Zookeeper hosts OS is Redhat 7 and JVM version is Java 8. Host names 
> are grid48, grid237 и grid251.
> We have one topic with six (6) patitions. Kafka consumers deployed on three 
> (3) hosts. Each host have two (2) consumers. All consumers belong to single 
> group.
>  
> *_Error description_*
> After start all consumers Apache Kafka partitions of topic was balanced 
> evenly.
> grid237 owns partitions 0,1 (0 - consumer thread-0, 1, consumer thread-1)
>  grid251 owns partitions 2,3 (2 - consumer thread-0, consumer thread-1)
>  grid48 owns partitions 4,5 (4- consumer thread-0, 5, consumer thread-1)
> After some period of time we see haotic revokes and assigns partitions 
> between brokers and then all partitions in log assigned to one consumer on 
> one node grid48
>  
> But really all partitions read not only by thread-1 consumer, but thread-0 on 
> grid48. And all messages from topic partitions was duplicate. Consumer 
> thread-0 try to commit message offset and get commit error, thread-1 consumer 
> successfully commit offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6537) Duplicate consumers after consumer group rebalance

2018-02-07 Thread Michael Golovanov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356606#comment-16356606
 ] 

Michael Golovanov edited comment on KAFKA-6537 at 2/8/18 7:46 AM:
--

Seems this issue is duplicate of KAFKA-5430


was (Author: mgolovanov):
Seems this issue duplicate of 
[KAFKA-5430|https://issues.apache.org/jira/browse/KAFKA-5430]

> Duplicate consumers after consumer group rebalance
> --
>
> Key: KAFKA-6537
> URL: https://issues.apache.org/jira/browse/KAFKA-6537
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Michael Golovanov
>Priority: Critical
>
> *_Deployment description_*
> Kafka brokers have been deployed on ten (10) nodes. Zookeeper cluster have 
> seven (7) nodes. Nodes of Kafka brokers shared with Zookeeper nodes on bare 
> metal hosts.
> Broker/Zookeeper hosts OS is Redhat 7 and JVM version is Java 8. Host names 
> are grid48, grid237 и grid251.
> We have one topic with six (6) patitions. Kafka consumers deployed on three 
> (3) hosts. Each host have two (2) consumers. All consumers belong to single 
> group.
>  
> *_Error description_*
> After start all consumers Apache Kafka partitions of topic was balanced 
> evenly.
> grid237 owns partitions 0,1 (0 - consumer thread-0, 1, consumer thread-1)
>  grid251 owns partitions 2,3 (2 - consumer thread-0, consumer thread-1)
>  grid48 owns partitions 4,5 (4- consumer thread-0, 5, consumer thread-1)
> After some period of time we see haotic revokes and assigns partitions 
> between brokers and then all partitions in log assigned to one consumer on 
> one node grid48
>  
> But really all partitions read not only by thread-1 consumer, but thread-0 on 
> grid48. And all messages from topic partitions was duplicate. Consumer 
> thread-0 try to commit message offset and get commit error, thread-1 consumer 
> successfully commit offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6541) StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread

2018-02-07 Thread Anh Le (JIRA)
Anh Le created KAFKA-6541:
-

 Summary: StackOverflow exceptions in thread 
'kafka-coordinator-heartbeat-thread
 Key: KAFKA-6541
 URL: https://issues.apache.org/jira/browse/KAFKA-6541
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 1.0.0
 Environment: Linux
Reporter: Anh Le


There's something wrong with our client library when sending heart beats. This 
bug seems to be identical to this one: 
[http://mail-archives.apache.org/mod_mbox/kafka-users/201712.mbox/%3CCALte62w6=pJObC+i36BkoqbOLTKsQ=nrddv6dm8abfwb5ps...@mail.gmail.com%3E]

 

Here's the log:

 

{{2018-02-08 13:55:01,102 ERROR 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread 
Uncaught exception in thread 'kafka-coordinator-heartbeat-thread | 
default-group':}}
{{java.lang.StackOverflowError: null}}
{{ at java.lang.StringBuilder.append(StringBuilder.java:136)}}
{{ at 
org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:302)}}
{{ at 
org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:271)}}
{{ at 
org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:233)}}
{{ at 
org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:173)}}
{{ at 
ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293)}}
{{ at 
ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206)}}
{{ at 
ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223)}}
{{ at 
ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102)}}
{{ at 
ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84)}}
{{ at 
ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51)}}
{{ at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270)}}
{{ at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257)}}
{{ at 
ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421)}}
{{ at ch.qos.logback.classic.Logger.filterAndLog_1(Logger.java:398)}}
{{ at ch.qos.logback.classic.Logger.info(Logger.java:583)}}
{{ at 
org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)}}
{{ at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649)}}
{{ at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)}}
{{ at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)}}
{{ at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)}}
{{ at 
org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)}}
{{ at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)}}
{{ at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)}}
{{ at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)}}
{{ at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)}}
{{ at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653)}}
{{ at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)}}
{{ at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)}}
{{ at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)}}
{{ at 
org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)}}
{{ at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)}}
{{ at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)}}
{{ at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)}}
{{ at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)}}
{{ at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653)}}
{{ at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)}}
{{ at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)}}
{{ at 

[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-02-07 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6481:
--
Description: 
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
 With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
     ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
     ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
     _inside a for-loop for each partition_ 
 ReplicaStateMachine.doHandleStateChanges
 ReplicaStateMachine.handleStateChanges
 KafkaController.onReplicasBecomeOffline
 KafkaController.onBrokerFailure

How to reproduce the problem:
 1. Cretae a cluster with 2 brokers having id 1 and 2
 2. Create a topic having 10 partitions and deliberately assign the replicas to 
non-existing brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
--replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field 
TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, 
and is ineligible for deletion.

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0

4._Verify that the following log message shows up 10 times in the 
controller.log file, one line for each partition in topic a0: "Leader not yet 
assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_

5. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
--replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`

6. Verify that the log message in step 4 appears *100 more* times (). This is 
because we have the following stack trace: 
addUpdateMetadataRequestForBrokers
addLeaderAndIsrRequestForBrokers
_inside a for-loop for each create response_   
initializeLeaderAndIsrForPartitions

In general, if n partitions have already been accumulated in the 
partitionsToBeDeleted variable, and a new topic is created with m partitions, m 
* n log messages above will be generated.

 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
OfflineReplica state on the controller.
 8. Verify that the following log message in step 4 appears another *210* 
times. This is because
a. During controlled shutdown, the function 
KafkaController.doControlledShutdown calls 
replicaStateMachine.handleStateChanges to transition all the replicas on broker 
2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs 
above.
b. When the broker zNode is gone in ZK, the function 
KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
to transition all the replicas on broker 2 to OfflineState. And this again 
generates 100 (10 x 10) logs above.
   c. At the bottom of the the function onReplicasBecomeOffline, it calls 
sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which 
generates 10 logs, one for each partition in the a0 topic.

In general, when we have n partitions accumulated in the variable 
partitionsToBeDeleted, and a broker with m partitions becomes offline, up to 2 
* m * n + n logs could be generated.

After applying the patch in this RB, I've verified that by going through the 
steps above, broker 2 going offline NO LONGER generates log entries for the a0 
partitions.
 Also I've verified that topic deletion for topic a1 still works fine.

  was:
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of 

[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-02-07 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6481:
--
Description: 
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
 With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
     ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
     ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
     _inside a for-loop for each partition_ 
 ReplicaStateMachine.doHandleStateChanges
 ReplicaStateMachine.handleStateChanges
 KafkaController.onReplicasBecomeOffline
 KafkaController.onBrokerFailure

How to reproduce the problem:
 1. Cretae a cluster with 2 brokers having id 1 and 2
 2. Create a topic having 10 partitions and deliberately assign the replicas to 
non-existing brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
--replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field 
TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, 
and is ineligible for deletion.

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0

4._Verify that the following log message shows up 10 times in the 
controller.log file, one line for each partition in topic a0: "Leader not yet 
assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_

5. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
--replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`

6. Verify that the log message in step 4 appears *100 more* times (). This is 
because we have the following stack trace: 
addUpdateMetadataRequestForBrokers
addLeaderAndIsrRequestForBrokers
_inside a for-loop for each create response_   
initializeLeaderAndIsrForPartitions

In general, if n partitions have already been accumulated in the 
partitionsToBeDeleted variable, and a new topic is created with m partitions, m 
* n log messages above will be generated.

 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
OfflineReplica state on the controller.
 8. Verify that the following log message in step 4 appears another *210* 
times. This is because
a. During controlled shutdown, the function 
KafkaController.doControlledShutdown calls 
replicaStateMachine.handleStateChanges to transition all the replicas on broker 
2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs 
above.
b. When the broker zNode is gone in ZK, the function 
KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
to transition all the replicas on broker 2 to OfflineState. And this again 
generates 100 (10 x 10) logs above.
   c. At the bottom of the the function onReplicasBecomeOffline, it calls 
sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which 
generates 10 logs, one for each partition in the a0 topic.

In general, when we have n partitions accumulated in the variable 
partitionsToBeDeleted, and a broker with m partitions becomes offline, 2 * m * 
n + n logs will be generated.

After applying the patch in this RB, I've verified that by going through the 
steps above, broker 2 going offline NO LONGER generates log entries for the a0 
partitions.
 Also I've verified that topic deletion for topic a1 still works fine.

  was:
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of 

[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-02-07 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6481:
--
Description: 
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
 With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
     ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
     ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
     _inside a for-loop for each partition_ 
 ReplicaStateMachine.doHandleStateChanges
 ReplicaStateMachine.handleStateChanges
 KafkaController.onReplicasBecomeOffline
 KafkaController.onBrokerFailure

How to reproduce the problem:
 1. Cretae a cluster with 2 brokers having id 1 and 2
 2. Create a topic having 10 partitions and deliberately assign the replicas to 
non-existing brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
--replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field 
TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, 
and is ineligible for deletion.

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0

4._Verify that the following log message shows up 10 times in the 
controller.log file, one line for each partition in topic a0: "Leader not yet 
assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_

5. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
--replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`

6. Verify that the log message in step 4 appears *100 more* times (). This is 
because we have the following stack trace: 
addUpdateMetadataRequestForBrokers
addLeaderAndIsrRequestForBrokers
_inside a for-loop for each create response_   
initializeLeaderAndIsrForPartitions


 5. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
OfflineReplica state on the controller.
 6. Verify that the following log message in step 4 appears another *210* 
times. This is because
a. During controlled shutdown, the function 
KafkaController.doControlledShutdown calls 
replicaStateMachine.handleStateChanges to transition all the replicas on broker 
2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs 
above.
b. When the broker zNode is gone in ZK, the function 
KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
to transition all the replicas on broker 2 to OfflineState. And this again 
generates 100 (10 x 10) logs above.
   c. At the bottom of the the function onReplicasBecomeOffline, it calls 
sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which 
generates 10 logs, one for each partition in the a0 topic.

According to the analysis in step 6, when we have n partitions accumulated in 
the variable partitionsToBeDeleted, and a broker with m partitions becomes 
offline, 2 * m * n + n logs in step 4 will be generated.

After applying the patch in this RB, I've verified that by going through the 
steps above, broker 2 going offline NO LONGER generates log entries for the a0 
partitions.
 Also I've verified that topic deletion for topic a1 still works fine.

  was:
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has 

[jira] [Commented] (KAFKA-5327) Console Consumer should only poll for up to max messages

2018-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356375#comment-16356375
 ] 

ASF GitHub Bot commented on KAFKA-5327:
---

huxihx opened a new pull request #4546: KAFKA-5327: Console Consumer should 
only poll for up to max messages
URL: https://github.com/apache/kafka/pull/4546
 
 
   Add a check to ensure --max-messages, if set,  must be set no smaller than 
max.poll.records.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Console Consumer should only poll for up to max messages
> 
>
> Key: KAFKA-5327
> URL: https://issues.apache.org/jira/browse/KAFKA-5327
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Dustin Cote
>Assignee: huxihx
>Priority: Minor
> Fix For: 1.2.0
>
>
> The ConsoleConsumer has a --max-messages flag that can be used to limit the 
> number of messages consumed. However, the number of records actually consumed 
> is governed by max.poll.records. This means you see one message on the 
> console, but your offset has moved forward a default of 500, which is kind of 
> counterintuitive. It would be good to only commit offsets for messages we 
> have printed to the console.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2423) Introduce Scalastyle

2018-02-07 Thread Ray Chiang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356374#comment-16356374
 ] 

Ray Chiang commented on KAFKA-2423:
---

Hmmm...it looks like my PR has a different default text.  Please let me know 
how to adjust this for the next time.

Some quick notes on the PR:
 * Kept Grant's earlier Gradle file setup
 * Updated plugin to version 0.9.0
 * Removed some calls from the previous PR since that's now in the main 
build.gradle
 * Put the output into core/build/scalastyle/scalastyle_report.xml and set 
quiet = true.  Keeps the terminal output to a minimum
 * Set failOnViolation = false to keep Scalastyle warnings from being flagged 
as an error by Gradle.

> Introduce Scalastyle
> 
>
> Key: KAFKA-2423
> URL: https://issues.apache.org/jira/browse/KAFKA-2423
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ray Chiang
>Priority: Major
>
> This is similar to Checkstyle (which we already use), but for Scala:
> http://www.scalastyle.org/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2423) Introduce Scalastyle

2018-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356371#comment-16356371
 ] 

ASF GitHub Bot commented on KAFKA-2423:
---

TolerableCoder opened a new pull request #4545: KAFKA-2423: Introduce Scalastyle
URL: https://github.com/apache/kafka/pull/4545
 
 
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Introduce Scalastyle
> 
>
> Key: KAFKA-2423
> URL: https://issues.apache.org/jira/browse/KAFKA-2423
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ray Chiang
>Priority: Major
>
> This is similar to Checkstyle (which we already use), but for Scala:
> http://www.scalastyle.org/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6405) Fix incorrect comment in MetadataUpdater

2018-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356357#comment-16356357
 ] 

ASF GitHub Bot commented on KAFKA-6405:
---

guozhangwang closed pull request #4361: KAFKA-6405:Fix incorrect comment in 
MetadataUpdater
URL: https://github.com/apache/kafka/pull/4361
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java 
b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
index cb821d6a860..126728342d4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
@@ -54,7 +54,7 @@
 long maybeUpdate(long now);
 
 /**
- * If `request` is a metadata request, handles it and return `true`. 
Otherwise, returns `false`.
+ * Handle disconnections for metadata requests.
  *
  * This provides a mechanism for the `MetadataUpdater` implementation to 
use the NetworkClient instance for its own
  * requests with special handling for disconnections of such requests.
@@ -70,7 +70,7 @@
 void handleAuthenticationFailure(AuthenticationException exception);
 
 /**
- * If `request` is a metadata request, handles it and returns `true`. 
Otherwise, returns `false`.
+ * Handle responses for metadata requests.
  *
  * This provides a mechanism for the `MetadataUpdater` implementation to 
use the NetworkClient instance for its own
  * requests with special handling for completed receives of such requests.


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix incorrect comment in MetadataUpdater
> 
>
> Key: KAFKA-6405
> URL: https://issues.apache.org/jira/browse/KAFKA-6405
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 1.0.0
>Reporter: guangxian.liao
>Priority: Trivial
>
> The comment for 'handleDisconnection' says it can return true or false, but 
> the return type is void.
> {code:java}
> /**
>  * If `request` is a metadata request, handles it and return `true`. 
> Otherwise, returns `false`.
>  *
>  * This provides a mechanism for the `MetadataUpdater` implementation to 
> use the NetworkClient instance for its own
>  * requests with special handling for disconnections of such requests.
>  * @param destination
>  */
> void handleDisconnection(String destination);
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed

2018-02-07 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356331#comment-16356331
 ] 

Guozhang Wang commented on KAFKA-6106:
--

[~ckamal] Are you still working on this issue?

> Postpone normal processing of tasks within a thread until restoration of all 
> tasks have completed
> -
>
> Key: KAFKA-6106
> URL: https://issues.apache.org/jira/browse/KAFKA-6106
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.1, 1.0.0
>Reporter: Guozhang Wang
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: newbie++
>
> Let's say a stream thread hosts multiple tasks, A and B. At the very 
> beginning when A and B are assigned to the thread, the thread state is 
> {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during 
> this state using the restore consumer while using normal consumer for 
> heartbeating.
> If task A's restoration has completed earlier than task B, then the thread 
> will start processing A immediately even when it is still in the 
> {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of 
> task B since it is single-thread. So the thread's transition to {{RUNNING}} 
> when all of its assigned tasks have completed restoring and now can be 
> processed will be delayed.
> Note that the streams instance's state will only transit to {{RUNNING}} when 
> all of its threads have transit to {{RUNNING}}, so the instance's transition 
> will also be delayed by this scenario.
> We'd better to not start processing ready tasks immediately, but instead 
> focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the 
> overall time of the instance's state transition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6514) Add API version as a tag for the RequestsPerSec metric

2018-02-07 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356321#comment-16356321
 ] 

Ismael Juma commented on KAFKA-6514:


[~allenxwang], the tags are part of the name in JMX and hence it would be a 
breaking change for anyone using the JmxReporter (which is enabled by default). 
If there are good arguments to break such uses, it can be considered as part of 
the KIP discussion.

If search for "metric", you can find a few KIPs that were posted for adding or 
updating existing metrics:

[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]

> Add API version as a tag for the RequestsPerSec metric
> --
>
> Key: KAFKA-6514
> URL: https://issues.apache.org/jira/browse/KAFKA-6514
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Allen Wang
>Priority: Major
>
> After we upgrade broker to a new version, one important insight is to see how 
> many clients have been upgraded so that we can switch the message format when 
> most of the clients have also been updated to the new version to minimize the 
> performance penalty. 
> RequestsPerSec with the version tag will give us that insight.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-02-07 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6481:
--
Description: 
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
 With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
     ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
     ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
     inside a for-loop for each partition 
 ReplicaStateMachine.doHandleStateChanges
 ReplicaStateMachine.handleStateChanges
 KafkaController.onReplicasBecomeOffline
 KafkaController.onBrokerFailure

How to reproduce the problem:
 1. Cretae a cluster with 2 brokers having id 1 and 2
 2. Create a topic having 10 partitions and deliberately assign the replicas to 
non-existing brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
--replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field 
TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, 
and is ineligible for deletion.

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0
 4. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
--replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`
 5. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
OfflineReplica state on the controller.
 6. Verify that the following log message appear over 200 times in the 
controller.log file, one for each iteration of the a0 partitions
 "Leader not yet assigned for partition [a0,..]. Skip sending 
UpdateMetadataRequest."

What happened was 
 1. During controlled shutdown, the function 
KafkaController.doControlledShutdown calls 
replicaStateMachine.handleStateChanges to transition all the replicas on broker 
2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs 
above.
 2. When the broker zNode is gone in ZK, the function 
KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
to transition all the replicas on broker 2 to OfflineState. And this again 
generates 100 (10 x 10) entries of the logs above.

After applying the patch in this RB, I've verified that by going through the 
steps above, broker 2 going offline NO LONGER generates log entries for the a0 
partitions.
 Also I've verified that topic deletion for topic a1 still works fine.

  was:
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
    ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
    

[jira] [Created] (KAFKA-6540) Consumer lag metric is not updated when a partition is paused

2018-02-07 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-6540:
--

 Summary: Consumer lag metric is not updated when a partition is 
paused
 Key: KAFKA-6540
 URL: https://issues.apache.org/jira/browse/KAFKA-6540
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


When a partition is paused, we no longer include it in fetches, which means we 
do not get updates to the high watermark. Since lag is computed based on the 
high watermark we've received in the most recent fetch, this means that the 
reported lag of a paused partitions will be stuck at whatever value it had when 
the partition was paused. A possible workaround is to continue fetching the 
partition, but set the max requested bytes for that partition to 0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6519) Change log level from ERROR to WARN for not leader for this partition exception

2018-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356277#comment-16356277
 ] 

ASF GitHub Bot commented on KAFKA-6519:
---

hachikuji closed pull request #4501: KAFKA-6519: Reduce log level for normal 
replica fetch errors
URL: https://github.com/apache/kafka/pull/4501
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala 
b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 23f53569a6b..e84472f06bb 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -114,9 +114,7 @@ class ConsumerFetcherManager(private val consumerIdString: 
String,
   }
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
-new ConsumerFetcherThread(
-  "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, 
sourceBroker.id),
-  config, sourceBroker, partitionMap, this)
+new ConsumerFetcherThread(consumerIdString, fetcherId, config, 
sourceBroker, partitionMap, this)
   }
 
   def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: 
Cluster) {
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala 
b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 705dc249bf3..ac83fa17a76 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -34,12 +34,13 @@ import org.apache.kafka.common.requests.EpochEndOffset
 
 @deprecated("This class has been deprecated and will be removed in a future 
release. " +
 "Please use org.apache.kafka.clients.consumer.internals.Fetcher 
instead.", "0.11.0.0")
-class ConsumerFetcherThread(name: String,
+class ConsumerFetcherThread(consumerIdString: String,
+fetcherId: Int,
 val config: ConsumerConfig,
 sourceBroker: BrokerEndPoint,
 partitionMap: Map[TopicPartition, 
PartitionTopicInfo],
 val consumerFetcherManager: ConsumerFetcherManager)
-extends AbstractFetcherThread(name = name,
+extends AbstractFetcherThread(name = 
s"ConsumerFetcherThread-$consumerIdString-$fetcherId-${sourceBroker.id}",
   clientId = config.clientId,
   sourceBroker = sourceBroker,
   fetchBackOffMs = 
config.refreshLeaderBackoffMs,
@@ -49,6 +50,9 @@ class ConsumerFetcherThread(name: String,
   type REQ = FetchRequest
   type PD = PartitionData
 
+  this.logIdent = s"[ConsumerFetcher consumerId=$consumerIdString, 
leaderId=${sourceBroker.id}, " +
+s"fetcherId=$fetcherId] "
+
   private val clientId = config.clientId
   private val fetchSize = config.fetchMessageMaxBytes
 
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 39a70321a6e..8d787c96da6 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -47,8 +47,7 @@ abstract class AbstractFetcherThread(name: String,
  val sourceBroker: BrokerEndPoint,
  fetchBackOffMs: Int = 0,
  isInterruptible: Boolean = true,
- includeLogTruncation: Boolean
-)
+ includeLogTruncation: Boolean)
   extends ShutdownableThread(name, isInterruptible) {
 
   type REQ <: FetchRequest
@@ -140,16 +139,15 @@ abstract class AbstractFetcherThread(name: String,
 
   private def processFetchRequest(fetchRequest: REQ) {
 val partitionsWithError = mutable.Set[TopicPartition]()
-
 var responseData: Seq[(TopicPartition, PD)] = Seq.empty
 
 try {
-  trace(s"Issuing fetch to broker ${sourceBroker.id}, request: 
$fetchRequest")
+  trace(s"Sending fetch request $fetchRequest")
   responseData = fetch(fetchRequest)
 } catch {
   case t: Throwable =>
 if (isRunning) {
-  warn(s"Error in fetch to broker ${sourceBroker.id}, request 
$fetchRequest", t)
+  warn(s"Error in response for fetch request $fetchRequest", t)
   inLock(partitionMapLock) {
 partitionsWithError ++= partitionStates.partitionSet.asScala
 // there is an error 

[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to infinity

2018-02-07 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356231#comment-16356231
 ] 

Matthias J. Sax commented on KAFKA-6535:


So you don't think that adding a "config" to the API would be useful? Something 
like

{{stream.through("topic", Produced.with(...).enablePurgeDataAfterRead());}}

> Set default retention ms for Streams repartition topics to infinity
> ---
>
> Key: KAFKA-6535
> URL: https://issues.apache.org/jira/browse/KAFKA-6535
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie
>
> After KIP-220 / KIP-204, repartition topics in Streams are transient, so it 
> is better to set its default retention to infinity to allow any records be 
> pushed to it with old timestamps (think: bootstrapping, re-processing) and 
> just rely on the purging API to keeping its storage small.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4641) Improve test coverage of StreamsThread

2018-02-07 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-4641.
--
   Resolution: Fixed
 Assignee: Guozhang Wang
Fix Version/s: 1.2.0

> Improve test coverage of StreamsThread
> --
>
> Key: KAFKA-4641
> URL: https://issues.apache.org/jira/browse/KAFKA-4641
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Damian Guy
>Assignee: Guozhang Wang
>Priority: Minor
>  Labels: newbie
> Fix For: 1.2.0
>
>
> Some methods in {{StreamThread}} have little or no coverage.
> In particular:
> {{maybeUpdateStandbyTasks}} has little to no coverage
> Committing of StandbyTasks in {{commitAll}}
> {{maybePunctuate}}
> {{commitOne}} - no tests for exceptions
> {{unAssignChangeLogPartitions} - no tests for exceptions
> {{addStreamsTask}} - no tests for exceptions
> {{runLoop}}
> Please see coverage report attached to parent



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4641) Improve test coverage of StreamsThread

2018-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356147#comment-16356147
 ] 

ASF GitHub Bot commented on KAFKA-4641:
---

guozhangwang closed pull request #4531: KAFKA-4641: Add more unit test for 
stream thread
URL: https://github.com/apache/kafka/pull/4531
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 064a2935515..5e25d02973a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1183,8 +1183,12 @@ public String toString(final String indent) {
 return sb.toString();
 }
 
-// this is for testing only
+// the following are for testing only
 TaskManager taskManager() {
 return taskManager;
 }
+
+Map>> standbyRecords() 
{
+return standbyRecords;
+}
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index e67fe14503c..cc056044792 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -21,6 +21,7 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.Cluster;
@@ -40,7 +41,13 @@
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
@@ -100,13 +107,16 @@ public void setUp() {
 }
 
 private final String topic1 = "topic1";
+private final String topic2 = "topic2";
 
 private final TopicPartition t1p1 = new TopicPartition(topic1, 1);
 private final TopicPartition t1p2 = new TopicPartition(topic1, 2);
+private final TopicPartition t2p1 = new TopicPartition(topic2, 1);
 
 // task0 is unused
 private final TaskId task1 = new TaskId(0, 1);
 private final TaskId task2 = new TaskId(0, 2);
+private final TaskId task3 = new TaskId(1, 1);
 
 private Properties configProps(final boolean enableEos) {
 return new Properties() {
@@ -129,7 +139,7 @@ private Properties configProps(final boolean enableEos) {
 public void testPartitionAssignmentChangeForSingleGroup() {
 internalTopologyBuilder.addSource(null, "source1", null, null, null, 
topic1);
 
-final StreamThread thread = getStreamThread();
+final StreamThread thread = createStreamThread(clientId, config, 
false);
 
 final StateListenerStub stateListener = new StateListenerStub();
 thread.setStateListener(stateListener);
@@ -685,10 +695,6 @@ public void onChange(final Thread thread, final 
ThreadStateTransitionValidator n
 }
 }
 
-private StreamThread getStreamThread() {
-return createStreamThread(clientId, config, false);
-}
-
 @Test
 public void shouldReturnActiveTaskMetadataWhileRunningState() throws 
InterruptedException {
 internalTopologyBuilder.addSource(null, "source", null, null, null, 
topic1);
@@ -759,6 +765,151 @@ public void 
shouldReturnStandbyTaskMetadataWhileRunningState() throws Interrupte
 assertTrue(threadMetadata.activeTasks().isEmpty());
 }
 
+@SuppressWarnings("unchecked")
+@Test
+public void shouldUpdateStandbyTask() {
+final String storeName1 

[jira] [Commented] (KAFKA-3910) Cyclic schema support in ConnectSchema and SchemaBuilder

2018-02-07 Thread Leonardo Toledo (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356145#comment-16356145
 ] 

Leonardo Toledo commented on KAFKA-3910:


Hi, someone can tell me if there are plans to fix and close this issue please? 

Thanks in advance. 

> Cyclic schema support in ConnectSchema and SchemaBuilder
> 
>
> Key: KAFKA-3910
> URL: https://issues.apache.org/jira/browse/KAFKA-3910
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: John Hofman
>Assignee: Shikhar Bhushan
>Priority: Major
>
> Cyclic schema's are not supported by ConnectSchema or SchemaBuilder. 
> Subsequently the AvroConverter (confluentinc/schema-registry) hits a stack 
> overflow when converting a cyclic avro schema, e.g:
> {code}
> {"type":"record", 
> "name":"list","fields":[{"name":"value","type":"int"},{"name":"next","type":["null","list"]}]}
> {code}
> This is a blocking issue for all connectors running on the connect framework 
> with data containing cyclic references. The AvroConverter cannot support 
> cyclic schema's until the underlying ConnectSchema and SchemaBuilder do.
> To reproduce the stack-overflow (Confluent-3.0.0):
> Produce some cyclic data:
> {code}
> bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test 
> --property value.schema='{"type":"record", 
> "name":"list","fields":[{"name":"value","type":"int"},{"name":"next","type":["null","list"]}]}'
> {"value":1,"next":null} 
> {"value":1,"next":{"list":{"value":2,"next":null}}}
> {code}
> Then try to consume it with connect:
> {code:title=connect-console-sink.properties}
> name=local-console-sink 
> connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector 
> tasks.max=1 
> topics=test
> {code}
> {code}
> ./bin/connect-standalone 
> ./etc/schema-registry/connect-avro-standalone.properties 
> connect-console-sink.properties  
> … start up logging … 
> java.lang.StackOverflowError 
>  at org.apache.avro.JsonProperties.getJsonProp(JsonProperties.java:54) 
>  at org.apache.avro.JsonProperties.getProp(JsonProperties.java:45) 
>  at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1055) 
>  at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1103) 
>  at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1137) 
>  at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1103) 
>  at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1137)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to infinity

2018-02-07 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356140#comment-16356140
 ] 

Guozhang Wang commented on KAFKA-6535:
--

I see you point now. And yes, I agree that we could improve the documentation 
to educate users about this point.

> Set default retention ms for Streams repartition topics to infinity
> ---
>
> Key: KAFKA-6535
> URL: https://issues.apache.org/jira/browse/KAFKA-6535
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie
>
> After KIP-220 / KIP-204, repartition topics in Streams are transient, so it 
> is better to set its default retention to infinity to allow any records be 
> pushed to it with old timestamps (think: bootstrapping, re-processing) and 
> just rely on the purging API to keeping its storage small.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor

2018-02-07 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4969:
---
Fix Version/s: 1.1.0

> State-store workload-aware StreamsPartitionAssignor
> ---
>
> Key: KAFKA-4969
> URL: https://issues.apache.org/jira/browse/KAFKA-4969
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 1.1.0
>
>
> Currently, {{StreamPartitionsAssigner}} does not distinguish different 
> "types" of tasks. For example, task can be stateless of have one or multiple 
> stores.
> This can lead to an suboptimal task placement: assume there are 2 stateless 
> and 2 stateful tasks and the app is running with 2 instances. To share the 
> "store load" it would be good to place one stateless and one stateful task 
> per instance. Right now, there is no guarantee about this, and it can happen, 
> that one instance processed both stateless tasks while the other processes 
> both stateful tasks.
> We should improve {{StreamPartitionAssignor}} and introduce "task types" 
> including a cost model for task placement. We should consider the following 
> parameters:
>  - number of stores
>  - number of sources/sinks
>  - number of processors
>  - regular task vs standby task
> This improvement should be backed by a design document in the project wiki 
> (no KIP required though) as it's a fairly complex change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6367) Fix StateRestoreListener To Use Correct Batch Ending Offset

2018-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355902#comment-16355902
 ] 

ASF GitHub Bot commented on KAFKA-6367:
---

mjsax closed pull request #4507: KAFKA-6367: StateRestoreListener use actual 
last restored offset for restored batch
URL: https://github.com/apache/kafka/pull/4507
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
index c80a736734d..ea1c2888409 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
@@ -43,7 +43,7 @@
  * @param topicPartition the TopicPartition containing the values to 
restore
  * @param storeName  the name of the store undergoing restoration
  * @param startingOffset the starting offset of the entire restoration 
process for this TopicPartition
- * @param endingOffset   the ending offset of the entire restoration 
process for this TopicPartition
+ * @param endingOffset   the exclusive ending offset of the entire 
restoration process for this TopicPartition
  */
 void onRestoreStart(final TopicPartition topicPartition,
 final String storeName,
@@ -62,7 +62,7 @@ void onRestoreStart(final TopicPartition topicPartition,
  *
  * @param topicPartition the TopicPartition containing the values to 
restore
  * @param storeName the name of the store undergoing restoration
- * @param batchEndOffset the ending offset for the current restored batch 
for this TopicPartition
+ * @param batchEndOffset the inclusive ending offset for the current 
restored batch for this TopicPartition
  * @param numRestored the total number of records restored in this batch 
for this TopicPartition
  */
 void onBatchRestored(final TopicPartition topicPartition,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index ba17ce95ede..b11c45ba313 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -273,12 +273,14 @@ private long processNext(final 
List> records,
 long nextPosition = -1;
 int numberRecords = records.size();
 int numberRestored = 0;
+long lastRestoredOffset = -1;
 for (final ConsumerRecord record : records) {
 final long offset = record.offset();
 if (restorer.hasCompleted(offset, endOffset)) {
 nextPosition = record.offset();
 break;
 }
+lastRestoredOffset = offset;
 numberRestored++;
 if (record.key() != null) {
 restoreRecords.add(KeyValue.pair(record.key(), 
record.value()));
@@ -295,8 +297,7 @@ private long processNext(final List> records,
 
 if (!restoreRecords.isEmpty()) {
 restorer.restore(restoreRecords);
-restorer.restoreBatchCompleted(nextPosition, records.size());
-
+restorer.restoreBatchCompleted(lastRestoredOffset, records.size());
 }
 
 return nextPosition;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index ee964513415..e69cede23fd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -234,15 +234,28 @@ public void shouldRestoreAndNotifyMultipleStores() throws 
Exception {
 assertThat(callbackTwo.restored.size(), equalTo(3));
 
 assertAllCallbackStatesExecuted(callback, "storeName1");
-assertCorrectOffsetsReportedByListener(callback, 0L, 10L, 10L);
+assertCorrectOffsetsReportedByListener(callback, 0L, 9L, 10L);
 
 assertAllCallbackStatesExecuted(callbackOne, "storeName2");
-assertCorrectOffsetsReportedByListener(callbackOne, 0L, 5L, 5L);
+assertCorrectOffsetsReportedByListener(callbackOne, 0L, 4L, 5L);
 
 assertAllCallbackStatesExecuted(callbackTwo, 

[jira] [Commented] (KAFKA-6469) ISR change notification queue can prevent controller from making progress

2018-02-07 Thread Kyle Ambroff-Kao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355829#comment-16355829
 ] 

Kyle Ambroff-Kao commented on KAFKA-6469:
-

[~junrao] The number of children of isr_change_notification isn't the problem. 
It's the size of the children of isr_change_notification that is the problem. 
In theory with a large enough cluster (I would guess around 400 or 500 with a 
large number of partitions per broker) you'd run in to problems with the number 
of children.

The specific situation I'm referring to is the broker attempting to write a 
child of isr_change_notification which exceeds 1MB. I just submitted a PR that 
addresses this.

> ISR change notification queue can prevent controller from making progress
> -
>
> Key: KAFKA-6469
> URL: https://issues.apache.org/jira/browse/KAFKA-6469
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff-Kao
>Assignee: Kyle Ambroff-Kao
>Priority: Major
>
> When the writes /isr_change_notification in ZooKeeper (which is effectively a 
> queue of ISR change events for the controller) happen at a rate high enough 
> that the node with a watch can't dequeue them, the trouble starts.
> The watcher kafka.controller.IsrChangeNotificationListener is fired in the 
> controller when a new entry is written to /isr_change_notification, and the 
> zkclient library sends a GetChildrenRequest to zookeeper to fetch all child 
> znodes.
> We've failures in one of our test clusters as the partition count started to 
> climb north of 60k per broker. We had brokers writing child nodes under 
> /isr_change_notification that were larger than the jute.maxbuffer size in 
> ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's 
> session, effectively bricking the cluster.
> This can be partially mitigated by chunking ISR notifications to increase the 
> maximum number of partitions a broker can host.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6469) ISR change notification queue can prevent controller from making progress

2018-02-07 Thread Kyle Ambroff-Kao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355828#comment-16355828
 ] 

Kyle Ambroff-Kao commented on KAFKA-6469:
-

Heh, yeah James, we don't typically run clusters like that in production. We 
try not to exceed 5k per node in practice. We have some test clusters which end 
up in this state when people experiment though. It doesn't work well, but 
having enough replica fetcher and IO threads definitely helps. It isn't 
straightforward to tune the broker to deal with this kind of load.

> ISR change notification queue can prevent controller from making progress
> -
>
> Key: KAFKA-6469
> URL: https://issues.apache.org/jira/browse/KAFKA-6469
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff-Kao
>Assignee: Kyle Ambroff-Kao
>Priority: Major
>
> When the writes /isr_change_notification in ZooKeeper (which is effectively a 
> queue of ISR change events for the controller) happen at a rate high enough 
> that the node with a watch can't dequeue them, the trouble starts.
> The watcher kafka.controller.IsrChangeNotificationListener is fired in the 
> controller when a new entry is written to /isr_change_notification, and the 
> zkclient library sends a GetChildrenRequest to zookeeper to fetch all child 
> znodes.
> We've failures in one of our test clusters as the partition count started to 
> climb north of 60k per broker. We had brokers writing child nodes under 
> /isr_change_notification that were larger than the jute.maxbuffer size in 
> ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's 
> session, effectively bricking the cluster.
> This can be partially mitigated by chunking ISR notifications to increase the 
> maximum number of partitions a broker can host.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6469) ISR change notification queue can prevent controller from making progress

2018-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355824#comment-16355824
 ] 

ASF GitHub Bot commented on KAFKA-6469:
---

ambroff opened a new pull request #4540: KAFKA-6469 Batch ISR change 
notifications
URL: https://github.com/apache/kafka/pull/4540
 
 
   When the writes /isr_change_notification in ZooKeeper (which is
   effectively a queue of ISR change events for the controller) happen at
   a rate high enough that the node with a watch can't dequeue them, the
   trouble starts.
   
   The watcher kafka.controller.IsrChangeNotificationListener is fired in
   the controller when a new entry is written to
   /isr_change_notification, and the zkclient library sends a
   GetChildrenRequest to zookeeper to fetch all child znodes.
   
   We've failures in one of our test clusters as the partition count
   started to climb north of 60k per broker. We had brokers writing child
   nodes under /isr_change_notification that were larger than the
   jute.maxbuffer size in ZooKeeper (1MB), causing the ZooKeeper server
   to drop the controller's session, effectively bricking the cluster.
   
   This can be partially mitigated by chunking ISR notifications to
   increase the maximum number of partitions a broker can host, which is
   the purpose of this patch.
   
   KafkaZkClient#propagateIsrChanges() now batches the set of
   TopicPartitions that will be written to the queue into sets of
   isr.notification.batch.size, which defaults to 3000. This default
   value is an approximate size that will guarantee that the JSON
   serialized collection will always be well under 1MB.
   
   You can see the worst case scenario in
   KafkaZkClientTest#testPropagateLargeNumberOfIsrChanges(), where a set
   of 5000 TopicPartitions are provided which have the longest possible
   JSON representation. This leads to a JSON payload that is around
   850k, leaving headroom for additional metadata.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ISR change notification queue can prevent controller from making progress
> -
>
> Key: KAFKA-6469
> URL: https://issues.apache.org/jira/browse/KAFKA-6469
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff-Kao
>Assignee: Kyle Ambroff-Kao
>Priority: Major
>
> When the writes /isr_change_notification in ZooKeeper (which is effectively a 
> queue of ISR change events for the controller) happen at a rate high enough 
> that the node with a watch can't dequeue them, the trouble starts.
> The watcher kafka.controller.IsrChangeNotificationListener is fired in the 
> controller when a new entry is written to /isr_change_notification, and the 
> zkclient library sends a GetChildrenRequest to zookeeper to fetch all child 
> znodes.
> We've failures in one of our test clusters as the partition count started to 
> climb north of 60k per broker. We had brokers writing child nodes under 
> /isr_change_notification that were larger than the jute.maxbuffer size in 
> ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's 
> session, effectively bricking the cluster.
> This can be partially mitigated by chunking ISR notifications to increase the 
> maximum number of partitions a broker can host.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6504) Connect: Some per-task-metrics not working

2018-02-07 Thread Robert Yokota (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355785#comment-16355785
 ] 

Robert Yokota commented on KAFKA-6504:
--

[~steff1193], it's not just the metric name change.  The PR also changes calls 
to metricGroup.metrics().sensor() with metricGroup.sensor().  This ensures that 
sensors names will be prepended with the metric group ID to ensure they are not 
shared across metric groups.

> Connect: Some per-task-metrics not working
> --
>
> Key: KAFKA-6504
> URL: https://issues.apache.org/jira/browse/KAFKA-6504
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Per Steffensen
>Priority: Minor
>
> Some Kafka-Connect-metrics seems to be wrong with respect to per-task - at 
> least it seems like MBean 
> "kafka.connect:type=source-task-metrics,connector=,task=x" 
> attribute "source-record-active-count" reports the same number for all x 
> tasks running in the same Kafka-Connect instance/JVM. E.g. if I have a 
> source-connector "my-connector" with 2 tasks that both run in the same 
> Kafka-Connect instance, but I know that only one of them actually produces 
> anything (and therefore can have "active source-records") both 
> "kafka.connect:type=source-task-metrics,connector=my-connector,task=0" and 
> "kafka.connect:type=source-task-metrics,connector=my-connector,task=1" goes 
> up (following each other). It should only go up for the one task that 
> actually produces something.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6504) Connect: Some per-task-metrics not working

2018-02-07 Thread Robert Yokota (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Yokota reassigned KAFKA-6504:


Assignee: Robert Yokota

> Connect: Some per-task-metrics not working
> --
>
> Key: KAFKA-6504
> URL: https://issues.apache.org/jira/browse/KAFKA-6504
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Per Steffensen
>Assignee: Robert Yokota
>Priority: Minor
>
> Some Kafka-Connect-metrics seems to be wrong with respect to per-task - at 
> least it seems like MBean 
> "kafka.connect:type=source-task-metrics,connector=,task=x" 
> attribute "source-record-active-count" reports the same number for all x 
> tasks running in the same Kafka-Connect instance/JVM. E.g. if I have a 
> source-connector "my-connector" with 2 tasks that both run in the same 
> Kafka-Connect instance, but I know that only one of them actually produces 
> anything (and therefore can have "active source-records") both 
> "kafka.connect:type=source-task-metrics,connector=my-connector,task=0" and 
> "kafka.connect:type=source-task-metrics,connector=my-connector,task=1" goes 
> up (following each other). It should only go up for the one task that 
> actually produces something.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6532) Delegation token internals should not impact public interfaces

2018-02-07 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-6532.
---
   Resolution: Fixed
Fix Version/s: 1.1.0

> Delegation token internals should not impact public interfaces
> --
>
> Key: KAFKA-6532
> URL: https://issues.apache.org/jira/browse/KAFKA-6532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.1.0
>
>
> We need to make sure that code related to the internal delegation tokens 
> implementation doesn't have any impact on public interfaces, including 
> customizable callback handlers from KIP-86.
>  # KafkaPrincipal has a public _tokenAuthenticated()_ method. Principal 
> builders are configurable and we now expect custom principal builders to set 
> this value. Since we allow the same endpoint to be used for basic SCRAM and 
> delegation tokens, the configured principal builder needs a way of detecting 
> token authentication. Default principal builder does this using internal 
> SCRAM implementation code. It will be better if configurable principal 
> builders didn't have to set this flag at all.
>  # It will be better to replace 
> _o.a.k.c.security.scram.DelegationTokenAuthenticationCallback_ with a more 
> generic _ScramExtensionsCallback_. This will allow us to add more extensions 
> in future and it will also enable custom Scram extensions.
>  # _ScramCredentialCallback_ was extended to add _tokenOwner_ and mechanism. 
> Mechanism is determined during SASL handshake and shouldn't be configurable 
> in a callback handler. _ScramCredentialCallback_ is being made a public 
> interface in KIP-86 with configurable callback handlers. Since delegation 
> token implementation is internal and not extensible, _tokenOwner_ should be 
> in a delegation-token-specific callback.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6529) Broker leaks memory and file descriptors after sudden client disconnects

2018-02-07 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-6529:
--
Fix Version/s: 1.0.2
   0.11.0.3
   1.1.0

> Broker leaks memory and file descriptors after sudden client disconnects
> 
>
> Key: KAFKA-6529
> URL: https://issues.apache.org/jira/browse/KAFKA-6529
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 1.0.0, 0.11.0.2
>Reporter: Graham Campbell
>Priority: Major
> Fix For: 1.1.0, 0.11.0.3, 1.0.2
>
>
> If a producer forcefully disconnects from a broker while it has staged 
> receives, that connection enters a limbo state where it is no longer 
> processed by the SocketServer.Processor, leaking the file descriptor for the 
> socket and the memory used for the staged recieve queue for that connection.
> We noticed this during an upgrade from 0.9.0.2 to 0.11.0.2. Immediately after 
> the rolling restart to upgrade, open file descriptors on the brokers started 
> climbing uncontrollably. In a few cases brokers reached our configured max 
> open files limit of 100k and crashed before we rolled back.
> We tracked this down to a buildup of muted connections in the 
> Selector.closingChannels list. If a client disconnects from the broker with 
> multiple pending produce requests, when the broker attempts to send an ack to 
> the client it recieves an IOException because the TCP socket has been closed. 
> This triggers the Selector to close the channel, but because it still has 
> pending requests, it adds it to Selector.closingChannels to process those 
> requests. However, because that exception was triggered by trying to send a 
> response, the SocketServer.Processor has marked the channel as muted and will 
> no longer process it at all.
> *Reproduced by:*
> Starting a Kafka broker/cluster
> Client produces several messages and then disconnects abruptly (eg. 
> _./rdkafka_performance -P -x 100 -b broker:9092 -t test_topic_)
> Broker then leaks file descriptor previously used for TCP socket and memory 
> for unprocessed messages
> *Proposed solution (which we've implemented internally)*
> Whenever an exception is encountered when writing to a socket in 
> Selector.pollSelectionKeys(...) record that that connection failed a send by 
> adding the KafkaChannel ID to Selector.failedSends. Then re-raise the 
> exception to still trigger the socket disconnection logic. Since every 
> exception raised in this function triggers a disconnect, we also treat any 
> exception while writing to the socket as a failed send.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6504) Connect: Some per-task-metrics not working

2018-02-07 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355562#comment-16355562
 ] 

Randall Hauch commented on KAFKA-6504:
--

[~rayokota], ping. Please see [~steff1193]'s comments. 

Also, [~hachikuji] this issue was not closed when the PR was merged, and 
there's no fix version.

> Connect: Some per-task-metrics not working
> --
>
> Key: KAFKA-6504
> URL: https://issues.apache.org/jira/browse/KAFKA-6504
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Per Steffensen
>Priority: Minor
>
> Some Kafka-Connect-metrics seems to be wrong with respect to per-task - at 
> least it seems like MBean 
> "kafka.connect:type=source-task-metrics,connector=,task=x" 
> attribute "source-record-active-count" reports the same number for all x 
> tasks running in the same Kafka-Connect instance/JVM. E.g. if I have a 
> source-connector "my-connector" with 2 tasks that both run in the same 
> Kafka-Connect instance, but I know that only one of them actually produces 
> anything (and therefore can have "active source-records") both 
> "kafka.connect:type=source-task-metrics,connector=my-connector,task=0" and 
> "kafka.connect:type=source-task-metrics,connector=my-connector,task=1" goes 
> up (following each other). It should only go up for the one task that 
> actually produces something.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-2423) Introduce Scalastyle

2018-02-07 Thread Grant Henke (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-2423:
--

Assignee: Ray Chiang  (was: Grant Henke)

> Introduce Scalastyle
> 
>
> Key: KAFKA-2423
> URL: https://issues.apache.org/jira/browse/KAFKA-2423
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ray Chiang
>Priority: Major
>
> This is similar to Checkstyle (which we already use), but for Scala:
> http://www.scalastyle.org/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6524) kafka mirror can't producer internal topic

2018-02-07 Thread Ahmed Madkour (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmed Madkour resolved KAFKA-6524.
--
Resolution: Information Provided

> kafka mirror can't producer internal topic 
> ---
>
> Key: KAFKA-6524
> URL: https://issues.apache.org/jira/browse/KAFKA-6524
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.0.0
>Reporter: Ahmed Madkour
>Priority: Minor
>
> We are using kafka-mirror-maker.sh to consume data from a 3 brokers kafka 
> cluster and producer the data to another single broker kafka cluster
> We want to include internal topics so we added the following in the consumer 
> configuration
> exclude.internal.topics=false
> We keep receiving the following errors:
> {code:java}
> org.apache.kafka.common.errors.InvalidTopicException: The request attempted 
> to perform an operation on an invalid topic.
>  ERROR Error when sending message to topic __consumer_offsets with key: 43 
> bytes, value: 28 bytes with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> {code}
> It seems that the producer can't access the internal topic __consumer_offsets.
> Any way to fix that?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6524) kafka mirror can't producer internal topic

2018-02-07 Thread Ahmed Madkour (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355471#comment-16355471
 ] 

Ahmed Madkour commented on KAFKA-6524:
--

Thank you, I am going to close the issue now.

> kafka mirror can't producer internal topic 
> ---
>
> Key: KAFKA-6524
> URL: https://issues.apache.org/jira/browse/KAFKA-6524
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.0.0
>Reporter: Ahmed Madkour
>Priority: Minor
>
> We are using kafka-mirror-maker.sh to consume data from a 3 brokers kafka 
> cluster and producer the data to another single broker kafka cluster
> We want to include internal topics so we added the following in the consumer 
> configuration
> exclude.internal.topics=false
> We keep receiving the following errors:
> {code:java}
> org.apache.kafka.common.errors.InvalidTopicException: The request attempted 
> to perform an operation on an invalid topic.
>  ERROR Error when sending message to topic __consumer_offsets with key: 43 
> bytes, value: 28 bytes with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> {code}
> It seems that the producer can't access the internal topic __consumer_offsets.
> Any way to fix that?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6524) kafka mirror can't producer internal topic

2018-02-07 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355451#comment-16355451
 ] 

huxihx commented on KAFKA-6524:
---

[~omkreddy] Sorry to mistake you for the reporter:)

[~ahmed.madkour] Like what Manikumar mentioned, all PRODUCE requests with this 
client ID will be accepted and processed by the broker, even for internal 
topics.

> kafka mirror can't producer internal topic 
> ---
>
> Key: KAFKA-6524
> URL: https://issues.apache.org/jira/browse/KAFKA-6524
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.0.0
>Reporter: Ahmed Madkour
>Priority: Minor
>
> We are using kafka-mirror-maker.sh to consume data from a 3 brokers kafka 
> cluster and producer the data to another single broker kafka cluster
> We want to include internal topics so we added the following in the consumer 
> configuration
> exclude.internal.topics=false
> We keep receiving the following errors:
> {code:java}
> org.apache.kafka.common.errors.InvalidTopicException: The request attempted 
> to perform an operation on an invalid topic.
>  ERROR Error when sending message to topic __consumer_offsets with key: 43 
> bytes, value: 28 bytes with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> {code}
> It seems that the producer can't access the internal topic __consumer_offsets.
> Any way to fix that?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6524) kafka mirror can't producer internal topic

2018-02-07 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355377#comment-16355377
 ] 

Manikumar commented on KAFKA-6524:
--

[~huxi_2b]  Thanks for the pointer. looked into the code. Never observed 
before. we are allowing produce requests to internal topics with  
"client.id=__admin_client"
[~ahmed.madkour]  If you are satisfied with solution, you can close the issue,

> kafka mirror can't producer internal topic 
> ---
>
> Key: KAFKA-6524
> URL: https://issues.apache.org/jira/browse/KAFKA-6524
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.0.0
>Reporter: Ahmed Madkour
>Priority: Minor
>
> We are using kafka-mirror-maker.sh to consume data from a 3 brokers kafka 
> cluster and producer the data to another single broker kafka cluster
> We want to include internal topics so we added the following in the consumer 
> configuration
> exclude.internal.topics=false
> We keep receiving the following errors:
> {code:java}
> org.apache.kafka.common.errors.InvalidTopicException: The request attempted 
> to perform an operation on an invalid topic.
>  ERROR Error when sending message to topic __consumer_offsets with key: 43 
> bytes, value: 28 bytes with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> {code}
> It seems that the producer can't access the internal topic __consumer_offsets.
> Any way to fix that?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6524) kafka mirror can't producer internal topic

2018-02-07 Thread Ahmed Madkour (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355342#comment-16355342
 ] 

Ahmed Madkour commented on KAFKA-6524:
--

[~huxi_2b] I tried it and I no longer see the errors. So what does 
"client.id=__admin_client" configuration mean?

> kafka mirror can't producer internal topic 
> ---
>
> Key: KAFKA-6524
> URL: https://issues.apache.org/jira/browse/KAFKA-6524
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.0.0
>Reporter: Ahmed Madkour
>Priority: Minor
>
> We are using kafka-mirror-maker.sh to consume data from a 3 brokers kafka 
> cluster and producer the data to another single broker kafka cluster
> We want to include internal topics so we added the following in the consumer 
> configuration
> exclude.internal.topics=false
> We keep receiving the following errors:
> {code:java}
> org.apache.kafka.common.errors.InvalidTopicException: The request attempted 
> to perform an operation on an invalid topic.
>  ERROR Error when sending message to topic __consumer_offsets with key: 43 
> bytes, value: 28 bytes with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> {code}
> It seems that the producer can't access the internal topic __consumer_offsets.
> Any way to fix that?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6524) kafka mirror can't producer internal topic

2018-02-07 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355320#comment-16355320
 ] 

Manikumar commented on KAFKA-6524:
--

[~huxi_2b] Not sure how "client.id" affects producing to internal topics?   
`client.id=__admin_client` used by AdminClient.scala  

> kafka mirror can't producer internal topic 
> ---
>
> Key: KAFKA-6524
> URL: https://issues.apache.org/jira/browse/KAFKA-6524
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.0.0
>Reporter: Ahmed Madkour
>Priority: Minor
>
> We are using kafka-mirror-maker.sh to consume data from a 3 brokers kafka 
> cluster and producer the data to another single broker kafka cluster
> We want to include internal topics so we added the following in the consumer 
> configuration
> exclude.internal.topics=false
> We keep receiving the following errors:
> {code:java}
> org.apache.kafka.common.errors.InvalidTopicException: The request attempted 
> to perform an operation on an invalid topic.
>  ERROR Error when sending message to topic __consumer_offsets with key: 43 
> bytes, value: 28 bytes with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> {code}
> It seems that the producer can't access the internal topic __consumer_offsets.
> Any way to fix that?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6498) Add RocksDB statistics via Streams metrics

2018-02-07 Thread james chien (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355257#comment-16355257
 ] 

james chien commented on KAFKA-6498:


[~guozhang] Now I familiar with kafka metrics, so I want to confirm the goals, 
the following two points are my thinking about to be resolved.

1) For currency, we enumerate all of metrics but not to let user select 
specific metrics directly, so we have to add this.

2) Design a new interface to allow user manually add more `rocksDB metrics` as 
you mentioned above in a custom way.

If both of two are right, I will be making a KIP proposing.

> Add RocksDB statistics via Streams metrics
> --
>
> Key: KAFKA-6498
> URL: https://issues.apache.org/jira/browse/KAFKA-6498
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics, streams
>Reporter: Guozhang Wang
>Assignee: james chien
>Priority: Major
>  Labels: needs-kip
>
> RocksDB's own stats can be programmatically exposed via 
> {{Options.statistics()}} and the JNI `Statistics` has indeed implemented many 
> useful settings already. However these stats are not exposed directly via 
> Streams today and hence for any users who wants to get access to them they 
> have to manually interact with the underlying RocksDB directly, not through 
> Streams.
> We should expose such stats via Streams metrics programmatically for users to 
> investigate them without trying to access the rocksDB directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6522) Retrying leaderEpoch request for partition xxx as the leader reported an error: UNKNOWN_SERVER_ERROR

2018-02-07 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355229#comment-16355229
 ] 

huxihx commented on KAFKA-6522:
---

[~bookxiao] hmmm Based on the stack trace, ApiKeys [Line 
#73|https://github.com/apache/kafka/blob/f20e4b72d3f5af4539a8c280efcf51b92d6a06af/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java#L73]
 threw the IllegalArgumentException, that proved it should be from 0.10.2.x 
codebase. Try to run command below to see if how many apis broker 401 supported:
{code:java}
bin/kafka-broker-api-versions.sh  --bootstrap-server ***{code}
 

 

> Retrying leaderEpoch request for partition xxx as the leader reported an 
> error: UNKNOWN_SERVER_ERROR
> 
>
> Key: KAFKA-6522
> URL: https://issues.apache.org/jira/browse/KAFKA-6522
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Affects Versions: 1.0.0
> Environment: Ubuntu 16.04 LTS 64bit-server
>Reporter: Wang Shuxiao
>Priority: Major
>
> we have 3 brokers in a kafka cluster(brokerid:401,402,403). The broker-403 
> fails to fetch data from leader:
> {code:java}
> [2018-02-02 08:58:26,861] INFO [ReplicaFetcher replicaId=403, leaderId=401, 
> fetcherId=0] Retrying leaderEpoch request for partition sub_payone1hour-0 as 
> the leader reported an error: UNKNOWN_SERVER_ERROR 
> (kafka.server.ReplicaFetcherThread)
> [2018-02-02 08:58:26,865] WARN [ReplicaFetcher replicaId=403, leaderId=401, 
> fetcherId=3] Error when sending leader epoch request for 
> Map(sub_myshardSinfo-3 -> -1, sub_myshardUinfo-1 -> -1, 
> sub_videoOnlineResourceType8Test-0 -> -1, pub_videoReportEevent-1 -> 9, 
> sub_StreamNofity-3 -> -1, pub_RsVideoInfo-1 -> -1, pub_lidaTopic3-15 -> -1, 
> pub_lidaTopic3-3 -> -1, sub_zwbtest-1 -> -1, sub_svAdminTagging-5 -> -1, 
> pub_channelinfoupdate-1 -> -1, pub_RsPlayInfo-4 -> -1, sub_tinyVideoWatch-4 
> -> 14, __consumer_offsets-36 -> -1, pub_ybusAuditorChannel3-2 -> -1, 
> pub_vipPush-4 -> -1, sub_LivingNotifyOnline-3 -> -1, sub_baseonline-4 -> -1, 
> __consumer_offsets-24 -> -1, sub_lidaTopic-3 -> -1, 
> sub_mobileGuessGameReward-0 -> -1, pub_lidaTopic-6 -> -1, sub_NewUserAlgo-0 
> -> -1, __consumer_offsets-48 -> -1, pub_RsUserBehavior-3 -> -1, 
> sub_channelinfoupdate-0 -> -1, pub_tinyVideoComment-1 -> -1, pub_bulletin-2 
> -> -1, pub_RecordCompleteNotifition-6 -> -1, sub_lidaTopic2-3 -> -1, 
> smsgateway-10 -> -1, __consumer_offsets-0 -> -1, pub_baseonlinetest-1 -> -1, 
> __consumer_offsets-12 -> -1, pub_myshardUinfo-0 -> -1, pub_baseonline-3 -> 
> -1, smsGatewayMarketDbInfo-6 -> -1, sub_tinyVideoComment-0 -> 14) 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 401 was disconnected before the response 
> was read
>  at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:95)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96)
>  at 
> kafka.server.ReplicaFetcherThread.fetchEpochsFromLeader(ReplicaFetcherThread.scala:312)
>  at 
> kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:130)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64){code}
>  
> on the leader(broker-401) side, the log shows:
> {code:java}
> [2018-02-02 08:58:26,859] ERROR Closing socket for 
> 192.168.100.101:9099-192.168.100.103:30476 because of error 
> (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Error getting request 
> for apiKey: 23 and apiVersion: 0
> Caused by: java.lang.IllegalArgumentException: Unexpected ApiKeys id `23`, it 
> should be between `0` and `20` (inclusive)
>  at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:73)
>  at 
> org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39)
>  at 
> kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:96)
>  at kafka.network.RequestChannel$Request.(RequestChannel.scala:91)
>  at 
> kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:492)
>  at 
> kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:487)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>  at kafka.network.Processor.processCompletedReceives(SocketServer.scala:487)
>  at kafka.network.Processor.run(SocketServer.scala:417)
>  at java.lang.Thread.run(Thread.java:745){code}



--
This message was sent by Atlassian JIRA

[jira] [Comment Edited] (KAFKA-6291) Cannot close EmbeddedZookeeper on Windows

2018-02-07 Thread Ivan Smurygin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355107#comment-16355107
 ] 

Ivan Smurygin edited comment on KAFKA-6291 at 2/7/18 8:13 AM:
--

Here is another workaround

[https://github.com/hazelcast/hazelcast-jet/blob/master/hazelcast-jet-kafka/src/test/java/com/hazelcast/jet/impl/connector/kafka/KafkaTestSupport.java]

But sill the problem with Gb of unreleased memory persists


was (Author: smuryginim):
Here is another workaround

https://github.com/hazelcast/hazelcast-jet/blob/master/hazelcast-jet-kafka/src/test/java/com/hazelcast/jet/impl/connector/kafka/KafkaTestSupport.java

> Cannot close EmbeddedZookeeper on Windows
> -
>
> Key: KAFKA-6291
> URL: https://issues.apache.org/jira/browse/KAFKA-6291
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Affects Versions: 0.11.0.0, 1.0.0
> Environment: Windows 10 (doesn't reproduce on Linux)
> JDK 8
>Reporter: Viliam Durina
>Priority: Major
>
> We created {{EmbeddedZookeeper}} and {{ZkClient}} for various tests using 
> this code:
> {code:java}
> EmbeddedZookeeper zkServer = new EmbeddedZookeeper();
> ZkClient zkClient = new ZkClient("127.0.0.1:" + zkServer.port(), 
> 3, 3, ZKStringSerializer$.MODULE$);
> zkClient.close();
> zkServer.shutdown();
> {code}
> This works fine on Linux, but on Windows, the {{zkServer.shutdown()}} call 
> fails with this exception:
> {code}
> [Thread-1] ERROR org.apache.kafka.test.TestUtils - Error deleting 
> C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083
> java.nio.file.FileSystemException: 
> C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083\version-2\log.1: 
> The process cannot access the file because it is being used by another 
> process.
>   at 
> sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
>   at 
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
>   at 
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102)
>   at 
> sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269)
>   at 
> sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
>   at java.nio.file.Files.delete(Files.java:1126)
>   at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:630)
>   at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:619)
>   at java.nio.file.Files.walkFileTree(Files.java:2670)
>   at java.nio.file.Files.walkFileTree(Files.java:2742)
>   at org.apache.kafka.common.utils.Utils.delete(Utils.java:619)
>   at org.apache.kafka.test.TestUtils$1.run(TestUtils.java:184)
> java.nio.file.FileSystemException: 
> C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083\version-2\log.1: 
> The process cannot access the file because it is being used by another 
> process.
>   at 
> sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
>   at 
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
>   at 
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102)
>   at 
> sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269)
>   at 
> sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
>   at java.nio.file.Files.delete(Files.java:1126)
>   at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:630)
>   at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:619)
>   at java.nio.file.Files.walkFileTree(Files.java:2670)
>   at java.nio.file.Files.walkFileTree(Files.java:2742)
>   at org.apache.kafka.common.utils.Utils.delete(Utils.java:619)
>   at kafka.zk.EmbeddedZookeeper.shutdown(EmbeddedZookeeper.scala:53)
>   at com.hazelcast.jet.KafkaTest.test(KafkaTest.java:32)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> 

[jira] [Commented] (KAFKA-6291) Cannot close EmbeddedZookeeper on Windows

2018-02-07 Thread Ivan Smurygin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355107#comment-16355107
 ] 

Ivan Smurygin commented on KAFKA-6291:
--

Here is another workaround

https://github.com/hazelcast/hazelcast-jet/blob/master/hazelcast-jet-kafka/src/test/java/com/hazelcast/jet/impl/connector/kafka/KafkaTestSupport.java

> Cannot close EmbeddedZookeeper on Windows
> -
>
> Key: KAFKA-6291
> URL: https://issues.apache.org/jira/browse/KAFKA-6291
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Affects Versions: 0.11.0.0, 1.0.0
> Environment: Windows 10 (doesn't reproduce on Linux)
> JDK 8
>Reporter: Viliam Durina
>Priority: Major
>
> We created {{EmbeddedZookeeper}} and {{ZkClient}} for various tests using 
> this code:
> {code:java}
> EmbeddedZookeeper zkServer = new EmbeddedZookeeper();
> ZkClient zkClient = new ZkClient("127.0.0.1:" + zkServer.port(), 
> 3, 3, ZKStringSerializer$.MODULE$);
> zkClient.close();
> zkServer.shutdown();
> {code}
> This works fine on Linux, but on Windows, the {{zkServer.shutdown()}} call 
> fails with this exception:
> {code}
> [Thread-1] ERROR org.apache.kafka.test.TestUtils - Error deleting 
> C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083
> java.nio.file.FileSystemException: 
> C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083\version-2\log.1: 
> The process cannot access the file because it is being used by another 
> process.
>   at 
> sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
>   at 
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
>   at 
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102)
>   at 
> sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269)
>   at 
> sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
>   at java.nio.file.Files.delete(Files.java:1126)
>   at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:630)
>   at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:619)
>   at java.nio.file.Files.walkFileTree(Files.java:2670)
>   at java.nio.file.Files.walkFileTree(Files.java:2742)
>   at org.apache.kafka.common.utils.Utils.delete(Utils.java:619)
>   at org.apache.kafka.test.TestUtils$1.run(TestUtils.java:184)
> java.nio.file.FileSystemException: 
> C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083\version-2\log.1: 
> The process cannot access the file because it is being used by another 
> process.
>   at 
> sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
>   at 
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
>   at 
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102)
>   at 
> sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269)
>   at 
> sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
>   at java.nio.file.Files.delete(Files.java:1126)
>   at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:630)
>   at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:619)
>   at java.nio.file.Files.walkFileTree(Files.java:2670)
>   at java.nio.file.Files.walkFileTree(Files.java:2742)
>   at org.apache.kafka.common.utils.Utils.delete(Utils.java:619)
>   at kafka.zk.EmbeddedZookeeper.shutdown(EmbeddedZookeeper.scala:53)
>   at com.hazelcast.jet.KafkaTest.test(KafkaTest.java:32)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at 

[jira] [Issue Comment Deleted] (KAFKA-6291) Cannot close EmbeddedZookeeper on Windows

2018-02-07 Thread Ivan Smurygin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Smurygin updated KAFKA-6291:
-
Comment: was deleted

(was: Hello! I faced the same issue during my testing.

Is there any progress for the ticket?)

> Cannot close EmbeddedZookeeper on Windows
> -
>
> Key: KAFKA-6291
> URL: https://issues.apache.org/jira/browse/KAFKA-6291
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Affects Versions: 0.11.0.0, 1.0.0
> Environment: Windows 10 (doesn't reproduce on Linux)
> JDK 8
>Reporter: Viliam Durina
>Priority: Major
>
> We created {{EmbeddedZookeeper}} and {{ZkClient}} for various tests using 
> this code:
> {code:java}
> EmbeddedZookeeper zkServer = new EmbeddedZookeeper();
> ZkClient zkClient = new ZkClient("127.0.0.1:" + zkServer.port(), 
> 3, 3, ZKStringSerializer$.MODULE$);
> zkClient.close();
> zkServer.shutdown();
> {code}
> This works fine on Linux, but on Windows, the {{zkServer.shutdown()}} call 
> fails with this exception:
> {code}
> [Thread-1] ERROR org.apache.kafka.test.TestUtils - Error deleting 
> C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083
> java.nio.file.FileSystemException: 
> C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083\version-2\log.1: 
> The process cannot access the file because it is being used by another 
> process.
>   at 
> sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
>   at 
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
>   at 
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102)
>   at 
> sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269)
>   at 
> sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
>   at java.nio.file.Files.delete(Files.java:1126)
>   at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:630)
>   at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:619)
>   at java.nio.file.Files.walkFileTree(Files.java:2670)
>   at java.nio.file.Files.walkFileTree(Files.java:2742)
>   at org.apache.kafka.common.utils.Utils.delete(Utils.java:619)
>   at org.apache.kafka.test.TestUtils$1.run(TestUtils.java:184)
> java.nio.file.FileSystemException: 
> C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083\version-2\log.1: 
> The process cannot access the file because it is being used by another 
> process.
>   at 
> sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
>   at 
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
>   at 
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102)
>   at 
> sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269)
>   at 
> sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
>   at java.nio.file.Files.delete(Files.java:1126)
>   at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:630)
>   at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:619)
>   at java.nio.file.Files.walkFileTree(Files.java:2670)
>   at java.nio.file.Files.walkFileTree(Files.java:2742)
>   at org.apache.kafka.common.utils.Utils.delete(Utils.java:619)
>   at kafka.zk.EmbeddedZookeeper.shutdown(EmbeddedZookeeper.scala:53)
>   at com.hazelcast.jet.KafkaTest.test(KafkaTest.java:32)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 

[jira] [Commented] (KAFKA-6291) Cannot close EmbeddedZookeeper on Windows

2018-02-07 Thread Ivan Smurygin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355103#comment-16355103
 ] 

Ivan Smurygin commented on KAFKA-6291:
--

Hello! I faced the same issue during my testing.

Is there any progress for the ticket?

> Cannot close EmbeddedZookeeper on Windows
> -
>
> Key: KAFKA-6291
> URL: https://issues.apache.org/jira/browse/KAFKA-6291
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Affects Versions: 0.11.0.0, 1.0.0
> Environment: Windows 10 (doesn't reproduce on Linux)
> JDK 8
>Reporter: Viliam Durina
>Priority: Major
>
> We created {{EmbeddedZookeeper}} and {{ZkClient}} for various tests using 
> this code:
> {code:java}
> EmbeddedZookeeper zkServer = new EmbeddedZookeeper();
> ZkClient zkClient = new ZkClient("127.0.0.1:" + zkServer.port(), 
> 3, 3, ZKStringSerializer$.MODULE$);
> zkClient.close();
> zkServer.shutdown();
> {code}
> This works fine on Linux, but on Windows, the {{zkServer.shutdown()}} call 
> fails with this exception:
> {code}
> [Thread-1] ERROR org.apache.kafka.test.TestUtils - Error deleting 
> C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083
> java.nio.file.FileSystemException: 
> C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083\version-2\log.1: 
> The process cannot access the file because it is being used by another 
> process.
>   at 
> sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
>   at 
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
>   at 
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102)
>   at 
> sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269)
>   at 
> sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
>   at java.nio.file.Files.delete(Files.java:1126)
>   at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:630)
>   at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:619)
>   at java.nio.file.Files.walkFileTree(Files.java:2670)
>   at java.nio.file.Files.walkFileTree(Files.java:2742)
>   at org.apache.kafka.common.utils.Utils.delete(Utils.java:619)
>   at org.apache.kafka.test.TestUtils$1.run(TestUtils.java:184)
> java.nio.file.FileSystemException: 
> C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083\version-2\log.1: 
> The process cannot access the file because it is being used by another 
> process.
>   at 
> sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
>   at 
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
>   at 
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102)
>   at 
> sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269)
>   at 
> sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
>   at java.nio.file.Files.delete(Files.java:1126)
>   at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:630)
>   at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:619)
>   at java.nio.file.Files.walkFileTree(Files.java:2670)
>   at java.nio.file.Files.walkFileTree(Files.java:2742)
>   at org.apache.kafka.common.utils.Utils.delete(Utils.java:619)
>   at kafka.zk.EmbeddedZookeeper.shutdown(EmbeddedZookeeper.scala:53)
>   at com.hazelcast.jet.KafkaTest.test(KafkaTest.java:32)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at