[jira] [Commented] (KAFKA-6446) KafkaProducer with transactionId endless waits when bootstrap server is down

2018-02-12 Thread Apurva Mehta (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361853#comment-16361853
 ] 

Apurva Mehta commented on KAFKA-6446:
-

IMO, the right solution here would be for the producer to transition to a 
`FATAL_ERROR` state so that the only possible operation afterward is to 
`close()` the producer. In reality, if a `transactionalId` is specified, but 
the producer can't connect the cluster and initialize transactions, it should 
be able to do nothing else. Otherwise we risk violating transactional semantics.

 

> KafkaProducer with transactionId endless waits when bootstrap server is down
> 
>
> Key: KAFKA-6446
> URL: https://issues.apache.org/jira/browse/KAFKA-6446
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Eduardo Sciullo
>Priority: Critical
> Attachments: Test.java
>
>
> When bootstrap server is down, a KafkaProducer with transactionId endless 
> waits on initTransactions. 
> The timeouts don't apply to that operation: don't honor the 
> {{TRANSACTION_TIMEOUT_CONFIG.}}
> Attached an example of my code to reproduce the scenario.
>  
> I opened this issue as suggested by [Gary 
> Russell|https://stackoverflow.com/users/1240763/gary-russell]
> [https://stackoverflow.com/questions/48226546/defaultkafkaproducerfactory-with-transactionidprefix-endless-waits-when-bootstra]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6503) Connect: Plugin scan is very slow

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361812#comment-16361812
 ] 

ASF GitHub Bot commented on KAFKA-6503:
---

rayokota closed pull request #4561: KAFKA-6503: Parallelize plugin scanning
URL: https://github.com/apache/kafka/pull/4561
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 345d7ef011d..c57caac9cba 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -269,6 +269,7 @@ private PluginScanResult scanPluginPath(
 ConfigurationBuilder builder = new ConfigurationBuilder();
 builder.setClassLoaders(new ClassLoader[]{loader});
 builder.addUrls(urls);
+builder.useParallelExecutor();
 Reflections reflections = new Reflections(builder);
 
 return new PluginScanResult(


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Connect: Plugin scan is very slow
> -
>
> Key: KAFKA-6503
> URL: https://issues.apache.org/jira/browse/KAFKA-6503
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Per Steffensen
>Priority: Critical
> Fix For: 1.1.0
>
>
> Just upgraded to 1.0.0. It seems some plugin scan has been introduced. It is 
> very slow - see logs from starting my Kafka-Connect instance at the bottom. 
> It takes almost 4 minutes scanning. I am running Kafka-Connect in docker 
> based on confluentinc/cp-kafka-connect:4.0.0. I set plugin.path to 
> /usr/share/java. The only thing I have added is a 13MB jar in 
> /usr/share/java/kafka-connect-file-streamer-client containing two connectors 
> and a converter. That one alone seems to take 20 secs.
> If it was just scanning in the background, and everything was working it 
> probably would not be a big issue. But it does not. Right after starting the 
> Kafka-Connect instance I try to create a connector via the /connectors 
> endpoint, but it will not succeed before the plugin scanning has finished (4 
> minutes)
> I am not even sure why scanning is necessary. Is it not always true that 
> connectors, converters etc are mentioned by name, so to see if it exists, 
> just try to load the class - the classloader will tell if it is available. 
> Hmmm, there is probably a reason.
> Anyway, either it should be made much faster, or at least Kafka-Connect 
> should be fully functional (or as functional as possible) while scanning is 
> going on.
> {code}
> [2018-01-30 13:52:26,834] INFO Scanning for plugin classes. This might take a 
> moment ... (org.apache.kafka.connect.cli.ConnectDistributed)
> [2018-01-30 13:52:27,218] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-file-streamer-client 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,037] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-file-streamer-client/}
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,038] INFO Added plugin 
> 'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerStreamSourceConnectorManager'
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,039] INFO Added plugin 
> 'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerFilesStreamerServerSourceConnectorManager'
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,040] INFO Added plugin 
> 'com.tlt.common.files.streamer.client.kafka.connect.KafkaConnectByteArrayConverter'
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,049] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-elasticsearch 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:47,595] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-elasticsearch/}
>  

[jira] [Commented] (KAFKA-6446) KafkaProducer with transactionId endless waits when bootstrap server is down

2018-02-12 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361744#comment-16361744
 ] 

huxihx commented on KAFKA-6446:
---

[~apurva] Please take some time to see the comments above? Thanks.

> KafkaProducer with transactionId endless waits when bootstrap server is down
> 
>
> Key: KAFKA-6446
> URL: https://issues.apache.org/jira/browse/KAFKA-6446
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Eduardo Sciullo
>Priority: Critical
> Attachments: Test.java
>
>
> When bootstrap server is down, a KafkaProducer with transactionId endless 
> waits on initTransactions. 
> The timeouts don't apply to that operation: don't honor the 
> {{TRANSACTION_TIMEOUT_CONFIG.}}
> Attached an example of my code to reproduce the scenario.
>  
> I opened this issue as suggested by [Gary 
> Russell|https://stackoverflow.com/users/1240763/gary-russell]
> [https://stackoverflow.com/questions/48226546/defaultkafkaproducerfactory-with-transactionidprefix-endless-waits-when-bootstra]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration

2018-02-12 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361725#comment-16361725
 ] 

Matthias J. Sax commented on KAFKA-6555:


I think KAFKA-6145 is not relevant here. Feel free to extend the ticket 
description of KAFKA-6144 directly and close this as duplicate (not child 
ticket required).

> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable giving making the partition down. We can 
> make the state store partition queryable for the data already present in the 
> state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams.
> I am working on a patch for this change. Any feedback or comments are welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6472) WordCount example code error

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361671#comment-16361671
 ] 

ASF GitHub Bot commented on KAFKA-6472:
---

guozhangwang closed pull request #126: KAFKA-6472 - Fix WordCount example code 
error
URL: https://github.com/apache/kafka-site/pull/126
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/10/streams/tutorial.html b/10/streams/tutorial.html
index 0bc7314..11162e8 100644
--- a/10/streams/tutorial.html
+++ b/10/streams/tutorial.html
@@ -516,7 +516,7 @@ Writing a th
 
 
 
-counts.toStream().to("streams-wordcount-output", 
Produced.with(Serdes.String(), Serdes.Long());
+counts.toStream().to("streams-wordcount-output", 
Produced.with(Serdes.String(), Serdes.Long()));
 
 
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> WordCount example code error
> 
>
> Key: KAFKA-6472
> URL: https://issues.apache.org/jira/browse/KAFKA-6472
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 0.11.0.2
>Reporter: JONYhao
>Assignee: Joel Hamill
>Priority: Trivial
>  Labels: newbie
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> This is a "(" missed in the WordCount example tutorial
> [https://kafka.apache.org/10/documentation/streams/tutorial]
> at the end of the page ,line 31
> 31   {{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), 
> Serdes.Long());}}
> {{should be }}
> {{31 }}{{.to(}}{{"streams-wordcount-output"}}{{, 
> Produced.with(Serdes.String(), Serdes.Long()));}}{{}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6472) WordCount example code error

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361668#comment-16361668
 ] 

ASF GitHub Bot commented on KAFKA-6472:
---

guozhangwang closed pull request #4538: KAFKA-6472 - Fix WordCount example code 
error
URL: https://github.com/apache/kafka/pull/4538
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/streams/tutorial.html b/docs/streams/tutorial.html
index 1c8c082ce97..f390b6df92b 100644
--- a/docs/streams/tutorial.html
+++ b/docs/streams/tutorial.html
@@ -516,7 +516,7 @@ Writing a th
 
 
 
-counts.toStream().to("streams-wordcount-output", 
Produced.with(Serdes.String(), Serdes.Long());
+counts.toStream().to("streams-wordcount-output", 
Produced.with(Serdes.String(), Serdes.Long()));
 
 
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> WordCount example code error
> 
>
> Key: KAFKA-6472
> URL: https://issues.apache.org/jira/browse/KAFKA-6472
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 0.11.0.2
>Reporter: JONYhao
>Assignee: Joel Hamill
>Priority: Trivial
>  Labels: newbie
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> This is a "(" missed in the WordCount example tutorial
> [https://kafka.apache.org/10/documentation/streams/tutorial]
> at the end of the page ,line 31
> 31   {{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), 
> Serdes.Long());}}
> {{should be }}
> {{31 }}{{.to(}}{{"streams-wordcount-output"}}{{, 
> Produced.with(Serdes.String(), Serdes.Long()));}}{{}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6555) Making state store queryable during restoration

2018-02-12 Thread Ashish Surana (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361613#comment-16361613
 ] 

Ashish Surana edited comment on KAFKA-6555 at 2/13/18 12:04 AM:


Ok, shouldn't it be the child ticket for KAFKA-6144 because the related tickets 
KAFKA-6145 & KAFKA-6031 doesn't completely solve the pause time during 
rebalancing.

KAFKA-6145 reduces the time of rebalancing state of the stream task to a great 
extent, but it doesn't completely remove it.

KAFKA-6031 is to allow reads from standby replicas, but it also doesn't 
completely eliminate the need to access state during rebalancing. What if there 
is no replica, and primary goes down? What if one of the replica doesn't get 
promoted to active? What if all the replicas of the partition are in 
rebalancing state at the same time?

KAFKA-6144 captures the idea of this ticket i.e. to allow access to state store 
during rebalancing but KAKFA-6145 & KAFKA-6031 are not sufficient to achieve 
that.

 


was (Author: asurana):
Ok, shouldn't it be the child ticket for KAFKA-6144 because the related tickets 
KAFKA-6145 & KAFKA-6031 doesn't completely solve the pause time during 
rebalancing.

 

KAFKA-6145 reduces the time of rebalancing state of the stream task to a great 
extent, but it doesn't completely remove it.

KAFKA-6031 is to allow reads from standby replicas, but it also doesn't 
completely eliminate the need to access state during rebalancing. What if there 
is no replica, and primary goes down? What if one of the replica doesn't get 
promoted to active? What if all the replicas of the partition are in 
rebalancing state at the same time?

 

KAFKA-6144 captures the idea of this ticket i.e. to allow access to state store 
during rebalancing but KAKFA-6145 & KAFKA-6031 are not sufficient to achieve 
that.

 

> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable giving making the partition down. We can 
> make the state store partition queryable for the data already present in the 
> state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams.
> I am working on a patch for this change. Any feedback or comments are welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration

2018-02-12 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361589#comment-16361589
 ] 

Matthias J. Sax commented on KAFKA-6555:


You mean, you are preparing an PR? Great! 

I would still like to merge both tickets. Can update the other ticket (or 
comment on it) and close this ticket as duplicate? 

> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable giving making the partition down. We can 
> make the state store partition queryable for the data already present in the 
> state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams.
> I am working on a patch for this change. Any feedback or comments are welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5944) Add unit tests for handling of authentication failures in clients

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361546#comment-16361546
 ] 

ASF GitHub Bot commented on KAFKA-5944:
---

hachikuji closed pull request #3965: KAFKA-5944: Unit tests for handling SASL 
authentication failures in clients
URL: https://github.com/apache/kafka/pull/3965
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index d843414fd7a..65255fe9648 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -20,6 +20,7 @@
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.utils.Time;
@@ -77,6 +78,7 @@ public FutureResponse(Node node,
 private Node node = null;
 private final Set ready = new HashSet<>();
 private final Map blackedOut = new HashMap<>();
+private final Map authenticationException = 
new HashMap<>();
 // Use concurrent queue for requests so that requests may be queried from 
a different thread
 private final Queue requests = new 
ConcurrentLinkedDeque<>();
 // Use concurrent queue for responses so that responses may be updated 
during poll() from a different thread.
@@ -102,7 +104,7 @@ public boolean isReady(Node node, long now) {
 
 @Override
 public boolean ready(Node node, long now) {
-if (isBlackedOut(node))
+if (isBlackedOut(node) || authenticationException(node) != null)
 return false;
 ready.add(node.idString());
 return true;
@@ -117,6 +119,12 @@ public void blackout(Node node, long duration) {
 blackedOut.put(node, time.milliseconds() + duration);
 }
 
+public void authenticationFailed(Node node, long duration) {
+authenticationException.put(node, (AuthenticationException) 
Errors.SASL_AUTHENTICATION_FAILED.exception());
+disconnect(node.idString());
+blackout(node, duration);
+}
+
 private boolean isBlackedOut(Node node) {
 if (blackedOut.containsKey(node)) {
 long expiration = blackedOut.get(node);
@@ -137,7 +145,7 @@ public boolean connectionFailed(Node node) {
 
 @Override
 public AuthenticationException authenticationException(Node node) {
-return null;
+return authenticationException.get(node);
 }
 
 @Override
@@ -347,6 +355,7 @@ public void reset() {
 responses.clear();
 futureResponses.clear();
 metadataUpdates.clear();
+authenticationException.clear();
 }
 
 public void prepareMetadataUpdate(Cluster cluster, Set 
unavailableTopics) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 186ccf06cb5..f08a99b6ddc 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -30,6 +30,7 @@
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.NotLeaderForPartitionException;
@@ -248,6 +249,75 @@ public void testInvalidTopicNames() throws Exception {
 }
 }
 
+@Test
+public void 
testAdminClientApisWithinBlackoutPeriodAfterAuthenticationFailure() throws 
Exception {
+AdminClientUnitTestEnv env = mockClientEnv();
+Node node = env.cluster().controller();
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+env.kafkaClient().setNode(node);
+env.kafkaClient().authenticationFailed(node, 300);
+
+callAdminClientApisAndExpectAnAuthenticationError(env);
+
+// wait less than the blackout period, the connection should fail and 
the authentication error should remain
+env.time().sleep(30);
+assertTrue(env.kafkaClient().connectionFailed(node));
+

[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration

2018-02-12 Thread Ashish Surana (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361515#comment-16361515
 ] 

Ashish Surana commented on KAFKA-6555:
--

Hi Matthias,

  Thanks for pointing to KAFKA-6144. As I go through it, it looks very similar 
ticket but with minor difference:
 * I am suggesting to allow stale reads only from PARTITION_ASSIGNED (not from 
PARTITION_REVOKED) primarily as it's going to be the one in RUNNING state, and 
this is the minimum we need to do keep serving request for this partition. We 
still have one instance doing write or read and still want to have pure standby 
replicas. This approach is good if we continue with current design of one 
active write/read instance.

I have made the changes, and can share it in few days.

> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable giving making the partition down. We can 
> make the state store partition queryable for the data already present in the 
> state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams.
> I am working on a patch for this change. Any feedback or comments are welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6555) Making state store queryable during restoration

2018-02-12 Thread Ashish Surana (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish Surana updated KAFKA-6555:
-
Description: 
State store in Kafka streams are currently only queryable when StreamTask is in 
RUNNING state. The idea is to make it queryable even in the RESTORATION 
(PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
making the data inaccessible during this time could be downtime not suitable 
for many applications.

When the active partition goes down then one of the following occurs:
 # One of the standby replica partition gets promoted to active: Replica task 
has to restore the remaining state from the changelog topic before it can 
become RUNNING. The time taken for this depends on how much the replica is 
lagging behind. During this restoration time the state store for that partition 
is currently not queryable giving making the partition down. We can make the 
state store partition queryable for the data already present in the state store.
 # When there is no replica or standby task, then active task will be started 
in one of the existing node. That node has to build the entire state from the 
changelog topic which can take lot of time depending on how big is the 
changelog topic, and keeping state store not queryable during this time is the 
downtime for the parition.

It's very important improvement as it could simply improve the availability of 
microservices developed using kafka streams.

I am working on a patch for this change. Any feedback or comments are welcome.

 

 

  was:
State store in Kafka streams are currently only queryable when StreamTask is in 
RUNNING state. The idea is to make it queryable even in the RESTORATION 
(PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
making the data inaccessible during this time could be downtime not suitable 
for many applications.

When the active partition goes down then one of the following occurs:
 # One of the standby replica partition gets promoted to active: Replica task 
has to restore the remaining state from the changelog topic before it can 
become RUNNING. The time taken for this depends on how much the replica is 
lagging behind. During this restoration time the state store for that partition 
is currently not queryable giving making the partition down. We can make the 
state store partition queryable for the data already present in the state store.
 # When there is no replica or standby task, then active task will be started 
in one of the existing node. That node has to build the entire state from the 
changelog topic which can take lot of time depending on how big is the 
changelog topic, and keeping state store not queryable during this time is the 
downtime for the parition.

It's very important improvement as it could simply improve the availability of 
microservices developed using kafka streams. Any feedback or comments are 
welcome.

 

 


> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable giving making the partition down. We can 
> make the state store partition queryable for the data already present in the 
> state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams.
> I am working on a patch for this change. Any feedback or comments are welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to infinity

2018-02-12 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361495#comment-16361495
 ] 

Matthias J. Sax commented on KAFKA-6535:


Ok. Sounds good. There are couple of other scenarios, too, but I guess we 
consider all of them as missing features or bugs and thus plan to resolve them 
in the future. Thanks for clarifying.

> Set default retention ms for Streams repartition topics to infinity
> ---
>
> Key: KAFKA-6535
> URL: https://issues.apache.org/jira/browse/KAFKA-6535
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie
>
> After KIP-220 / KIP-204, repartition topics in Streams are transient, so it 
> is better to set its default retention to infinity to allow any records be 
> pushed to it with old timestamps (think: bootstrapping, re-processing) and 
> just rely on the purging API to keeping its storage small.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration

2018-02-12 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361492#comment-16361492
 ] 

Matthias J. Sax commented on KAFKA-6555:


[~asurana] Thanks for create this JIRA: how is this different to KAFKA-6144 ?

> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable giving making the partition down. We can 
> make the state store partition queryable for the data already present in the 
> state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams. Any feedback or comments are 
> welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to infinity

2018-02-12 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361490#comment-16361490
 ] 

Guozhang Wang commented on KAFKA-6535:
--

I think it may be brining more operational risk than benefit in practice: 
knowing that some topics is not used by any other teams for sure is hard, so 
opening this door to allow users to this, mistakenly, may lead to more issues.

Instead, we should just encourage users to use internal topics when possible 
unless there are reasons not to, so far we have the following reasons that I 
think we can fix in the near future, so it would be still a good approach for 
encouraging internal topics moving forward:

1. Users want to override {{num.partitions}} and other configs for separate 
topics; this is being discussed in some KIPs already.
2. Users want to maintain the topic names for extensibility (upgrading the app 
without restarting a new app, for example); this is also being discussed to be 
better supported in the future.

> Set default retention ms for Streams repartition topics to infinity
> ---
>
> Key: KAFKA-6535
> URL: https://issues.apache.org/jira/browse/KAFKA-6535
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie
>
> After KIP-220 / KIP-204, repartition topics in Streams are transient, so it 
> is better to set its default retention to infinity to allow any records be 
> pushed to it with old timestamps (think: bootstrapping, re-processing) and 
> just rely on the purging API to keeping its storage small.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5846) Use singleton NoOpConsumerRebalanceListener in subscribe() call where listener is not specified

2018-02-12 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191405#comment-16191405
 ] 

Ted Yu edited comment on KAFKA-5846 at 2/12/18 8:49 PM:


Patch looks good to me .


was (Author: yuzhih...@gmail.com):
Patch looks good to me.

> Use singleton NoOpConsumerRebalanceListener in subscribe() call where 
> listener is not specified
> ---
>
> Key: KAFKA-5846
> URL: https://issues.apache.org/jira/browse/KAFKA-5846
> Project: Kafka
>  Issue Type: Task
>Reporter: Ted Yu
>Assignee: Kamal Chandraprakash
>Priority: Minor
>
> Currently KafkaConsumer creates instance of NoOpConsumerRebalanceListener for 
> each subscribe() call where ConsumerRebalanceListener is not specified:
> {code}
> public void subscribe(Pattern pattern) {
> subscribe(pattern, new NoOpConsumerRebalanceListener());
> {code}
> We can create a singleton NoOpConsumerRebalanceListener to be used in such 
> scenarios.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6555) Making state store queryable during restoration

2018-02-12 Thread Ashish Surana (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish Surana updated KAFKA-6555:
-
Description: 
State store in Kafka streams are currently only queryable when StreamTask is in 
RUNNING state. The idea is to make it queryable even in the RESTORATION 
(PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
making the data inaccessible during this time could be downtime not suitable 
for many applications.

When the active partition goes down then one of the following occurs:
 # One of the standby replica partition gets promoted to active: Replica task 
has to restore the remaining state from the changelog topic before it can 
become RUNNING. The time taken for this depends on how much the replica is 
lagging behind. During this restoration time the state store for that partition 
is currently not queryable giving making the partition down. We can make the 
state store partition queryable for the data already present in the state store.
 # When there is no replica or standby task, then active task will be started 
in one of the existing node. That node has to build the entire state from the 
changelog topic which can take lot of time depending on how big is the 
changelog topic, and keeping state store not queryable during this time is the 
downtime for the parition.

It's very important improvement as it could simply improve the availability of 
microservices developed using kafka streams. Any feedback or comments are 
welcome.

 

 

  was:
State store in Kafka streams are currently only queryable when StreamTask is in 
RUNNING state. The idea is to make it queryable even in the RESTORATION 
(PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
making the data inaccessible during this time could be downtime not suitable 
for many applications.

When the active partition goes down then one of the following occurs:
 # One of the standby replica partition gets promoted to active: Replica task 
has to restore the remaining state from the changelog topic before it can 
become RUNNING. The time taken for this depends on how much the replica is 
lagging behind. During this restoration time the state store for that partition 
is currently not queryable giving making the partition down. We can make the 
state store partition queryable for the data already present in the state store.
 # When there is no replica or standby task, then active task will be started 
in one of the existing node. That node has to build the entire state from the 
changelog topic which can take lot of time depending on how big is the 
changelog topic, and keeping state store not queryable during this time is the 
downtime for the parition.

 

It's very important improvement as it could simply improve the availability of 
microservices developed using kafka streams. Any feedback or comments are 
welcome.

 

 


> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable giving making the partition down. We can 
> make the state store partition queryable for the data already present in the 
> state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams. Any feedback or comments are 
> welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6555) Making state store queryable during restoration

2018-02-12 Thread Ashish Surana (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish Surana updated KAFKA-6555:
-
Description: 
State store in Kafka streams are currently only queryable when StreamTask is in 
RUNNING state. The idea is to make it queryable even in the RESTORATION 
(PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
making the data inaccessible during this time could be downtime not suitable 
for many applications.

When the active partition goes down then one of the following occurs:
 # One of the standby replica partition gets promoted to active: Replica task 
has to restore the remaining state from the changelog topic before it can 
become RUNNING. The time taken for this depends on how much the replica is 
lagging behind. During this restoration time the state store for that partition 
is currently not queryable giving making the partition down. We can make the 
state store partition queryable for the data already present in the state store.
 # When there is no replica or standby task, then active task will be started 
in one of the existing node. That node has to build the entire state from the 
changelog topic which can take lot of time depending on how big is the 
changelog topic, and keeping state store not queryable during this time is the 
downtime for the parition.

 

It's very important improvement as it could simply improve the availability of 
microservices developed using kafka streams. Any feedback or comments are 
welcome.

 

 

  was:
State store in Kafka streams are currently only queryable when StreamTask is in 
RUNNING state. The idea is to make it queryable even in the RESTORATION 
(PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
making the data inaccessible during this time could be downtime not suitable 
for many applications.

When the active partition goes down then one of the following occurs:
 # One of the standby replica partition gets promoted to active: Replica 
restores the remaining state from the changelog topic. The time taken for this 
depends on how much the replica is lagging behind in consuming changelog topic. 
During this restoration time the state store for that partition is not 
queryable giving us the downtime for the entire parition. We can make it 
queryable for the data already present in the state store.
 # When there is no replica or standby task, then active task will be started 
in one of the existing node. That node has to build the entire state from the 
changelog topic which can take lot of time depending on how big is the 
changelog topic, and keeping state store not queryable during this time is the 
downtime for the parition.

 

It's very important improvement as it could simply improve the availability of 
microservices developed using kafka streams. Any feedback or comments are 
welcome.

 

 


> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable giving making the partition down. We can 
> make the state store partition queryable for the data already present in the 
> state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
>  
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams. Any feedback or comments are 
> welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6555) Making state store queryable during restoration

2018-02-12 Thread Ashish Surana (JIRA)
Ashish Surana created KAFKA-6555:


 Summary: Making state store queryable during restoration
 Key: KAFKA-6555
 URL: https://issues.apache.org/jira/browse/KAFKA-6555
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Ashish Surana


State store in Kafka streams are currently only queryable when StreamTask is in 
RUNNING state. The idea is to make it queryable even in the RESTORATION 
(PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
making the data inaccessible during this time could be downtime not suitable 
for many applications.

When the active partition goes down then one of the following occurs:
 # One of the standby replica partition gets promoted to active: Replica 
restores the remaining state from the changelog topic. The time taken for this 
depends on how much the replica is lagging behind in consuming changelog topic. 
During this restoration time the state store for that partition is not 
queryable giving us the downtime for the entire parition. We can make it 
queryable for the data already present in the state store.
 # When there is no replica or standby task, then active task will be started 
in one of the existing node. That node has to build the entire state from the 
changelog topic which can take lot of time depending on how big is the 
changelog topic, and keeping state store not queryable during this time is the 
downtime for the parition.

 

It's very important improvement as it could simply improve the availability of 
microservices developed using kafka streams. Any feedback or comments are 
welcome.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6555) Making state store queryable during restoration

2018-02-12 Thread Ashish Surana (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish Surana updated KAFKA-6555:
-
Flags: Important

> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica 
> restores the remaining state from the changelog topic. The time taken for 
> this depends on how much the replica is lagging behind in consuming changelog 
> topic. During this restoration time the state store for that partition is not 
> queryable giving us the downtime for the entire parition. We can make it 
> queryable for the data already present in the state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
>  
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams. Any feedback or comments are 
> welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6554) Broker doesn't reject Produce request with inconsistent state

2018-02-12 Thread Simon Fell (JIRA)
Simon Fell created KAFKA-6554:
-

 Summary: Broker doesn't reject Produce request with inconsistent 
state
 Key: KAFKA-6554
 URL: https://issues.apache.org/jira/browse/KAFKA-6554
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 1.0.0
Reporter: Simon Fell
 Attachments: produce_v3.txt

Produce messages of type v3 have offset deltas in each record along with a 
LastOffsetDelta for the topic/partition set. In investigating an issue with 
missing offsets, I found a bug in a producer library where it would send 
multiple records, but leave LastOffsetDelta at 0. This causes various problems 
including holes in the offsets fetched by the consumer. 

As lastOffsetDelta can be computed by looking at the records, it seems like the 
broker should at least validate the LastOffsetDelta field against the contained 
records to stop this bad data getting in.

I've attached a decode v3 produce message that was causing the problems, and 
was accepted by the broker.

Here's a link to the issue in the kafka library we were using which has more 
context if you need it.

https://github.com/Shopify/sarama/issues/1032

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6552) “entity_type” not exactly in description of kafka-configs.sh

2018-02-12 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6552.
--
   Resolution: Fixed
 Assignee: Xin
Fix Version/s: 1.2.0

> “entity_type” not exactly  in description of kafka-configs.sh
> -
>
> Key: KAFKA-6552
> URL: https://issues.apache.org/jira/browse/KAFKA-6552
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.11.0.2
>Reporter: Xin
>Assignee: Xin
>Priority: Trivial
> Fix For: 1.2.0
>
>
> There are some “entity_type” in the description of the command option 
> “--add-config”, but the “--entity-type ” is the command option of 
> kafka-configs.sh 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6552) “entity_type” not exactly in description of kafka-configs.sh

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361153#comment-16361153
 ] 

ASF GitHub Bot commented on KAFKA-6552:
---

guozhangwang closed pull request #4556: KAFKA-6552:“entity_type” not exactly  
in description of kafka-configs.sh
URL: https://github.com/apache/kafka/pull/4556
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index b18dcc99937..dfaaa51139c 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -296,10 +296,10 @@ object ConfigCommand extends Config {
 
 val nl = System.getProperty("line.separator")
 val addConfig = parser.accepts("add-config", "Key Value pairs of configs 
to add. Square brackets can be used to group values which contain commas: 
'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " 
+
-"For entity_type '" + ConfigType.Topic + "': " + 
LogConfig.configNames.map("\t" + _).mkString(nl, nl, nl) +
-"For entity_type '" + ConfigType.Broker + "': " + 
DynamicConfig.Broker.names.asScala.map("\t" + _).mkString(nl, nl, nl) +
-"For entity_type '" + ConfigType.User + "': " + 
DynamicConfig.User.names.asScala.map("\t" + _).mkString(nl, nl, nl) +
-"For entity_type '" + ConfigType.Client + "': " + 
DynamicConfig.Client.names.asScala.map("\t" + _).mkString(nl, nl, nl) +
+"For entity-type '" + ConfigType.Topic + "': " + 
LogConfig.configNames.map("\t" + _).mkString(nl, nl, nl) +
+"For entity-type '" + ConfigType.Broker + "': " + 
DynamicConfig.Broker.names.asScala.map("\t" + _).mkString(nl, nl, nl) +
+"For entity-type '" + ConfigType.User + "': " + 
DynamicConfig.User.names.asScala.map("\t" + _).mkString(nl, nl, nl) +
+"For entity-type '" + ConfigType.Client + "': " + 
DynamicConfig.Client.names.asScala.map("\t" + _).mkString(nl, nl, nl) +
 s"Entity types '${ConfigType.User}' and '${ConfigType.Client}' may 
be specified together to update config for clients of a specific user.")
 .withRequiredArg
 .ofType(classOf[String])


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> “entity_type” not exactly  in description of kafka-configs.sh
> -
>
> Key: KAFKA-6552
> URL: https://issues.apache.org/jira/browse/KAFKA-6552
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.11.0.2
>Reporter: Xin
>Priority: Trivial
>
> There are some “entity_type” in the description of the command option 
> “--add-config”, but the “--entity-type ” is the command option of 
> kafka-configs.sh 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6367) Fix StateRestoreListener To Use Correct Batch Ending Offset

2018-02-12 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-6367:
-
Fix Version/s: (was: 1.0.2)
   1.0.1

> Fix StateRestoreListener To Use Correct Batch Ending Offset
> ---
>
> Key: KAFKA-6367
> URL: https://issues.apache.org/jira/browse/KAFKA-6367
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 1.1.0, 1.0.1
>
>
> {{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} 
> long for the batch ending offset, but the {{nextPosition}} is not correct, it 
> should be the offset of the latest restored offset, but {{nextPosition}} is 
> the offset of the first not restored offset.
> We can't automatically use {{nextPosition}} - 1 as this could be a commit 
> marker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6504) Connect: Some per-task-metrics not working

2018-02-12 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-6504:
-
Fix Version/s: 1.0.1

> Connect: Some per-task-metrics not working
> --
>
> Key: KAFKA-6504
> URL: https://issues.apache.org/jira/browse/KAFKA-6504
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Per Steffensen
>Assignee: Robert Yokota
>Priority: Minor
> Fix For: 1.1.0, 1.0.1
>
>
> Some Kafka-Connect-metrics seems to be wrong with respect to per-task - at 
> least it seems like MBean 
> "kafka.connect:type=source-task-metrics,connector=,task=x" 
> attribute "source-record-active-count" reports the same number for all x 
> tasks running in the same Kafka-Connect instance/JVM. E.g. if I have a 
> source-connector "my-connector" with 2 tasks that both run in the same 
> Kafka-Connect instance, but I know that only one of them actually produces 
> anything (and therefore can have "active source-records") both 
> "kafka.connect:type=source-task-metrics,connector=my-connector,task=0" and 
> "kafka.connect:type=source-task-metrics,connector=my-connector,task=1" goes 
> up (following each other). It should only go up for the one task that 
> actually produces something.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5550) Struct.put() should include the field name if validation fails

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16361101#comment-16361101
 ] 

ASF GitHub Bot commented on KAFKA-5550:
---

hachikuji closed pull request #3507: KAFKA-5550 
URL: https://github.com/apache/kafka/pull/3507
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java
index 6e7b5d23bbd..5e4de21b632 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java
@@ -211,7 +211,9 @@ public Struct put(String fieldName, Object value) {
  * @return the Struct, to allow chaining of {@link #put(String, Object)} 
calls
  */
 public Struct put(Field field, Object value) {
-ConnectSchema.validateValue(field.schema(), value);
+if (null == field)
+throw new DataException("field cannot be null.");
+ConnectSchema.validateValue(field.name(), field.schema(), value);
 values[field.index()] = value;
 return this;
 }
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java 
b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
index 42345b1986c..e91960576a1 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
@@ -266,4 +266,28 @@ public void testValidateFieldWithInvalidValueType() {
 thrown.expectMessage("Invalid Java object for schema type INT8: class 
java.lang.Object for field: \"field\"");
 ConnectSchema.validateValue(fieldName, Schema.INT8_SCHEMA, new 
Object());
 }
+
+@Test
+public void testPutNullField() {
+final String fieldName = "fieldName";
+Schema testSchema = SchemaBuilder.struct()
+.field(fieldName, Schema.STRING_SCHEMA);
+Struct struct = new Struct(testSchema);
+
+thrown.expect(DataException.class);
+Field field = null;
+struct.put(field, "valid");
+}
+
+@Test
+public void testInvalidPutIncludesFieldName() {
+final String fieldName = "fieldName";
+Schema testSchema = SchemaBuilder.struct()
+.field(fieldName, Schema.STRING_SCHEMA);
+Struct struct = new Struct(testSchema);
+
+thrown.expect(DataException.class);
+thrown.expectMessage("Invalid value: null used for required field: 
\"fieldName\", schema type: STRING");
+struct.put(fieldName, null);
+}
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Struct.put() should include the field name if validation fails
> --
>
> Key: KAFKA-5550
> URL: https://issues.apache.org/jira/browse/KAFKA-5550
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Jeremy Custenborder
>Assignee: Jeremy Custenborder
>Priority: Minor
>
> When calling struct.put() with an invalid value, the error message should 
> include the field name.
> {code:java}
> @Test
> public void testPutIncludesFieldName() {
> final String fieldName = "fieldName";
> Schema testSchema = SchemaBuilder.struct()
> .field(fieldName, Schema.STRING_SCHEMA);
> Struct struct = new Struct(testSchema);
> try {
> struct.put(fieldName, null);
> } catch (DataException ex) {
> assertEquals(
> "Invalid value: null used for required field: \"fieldName\", 
> schema type: STRING",
> ex.getMessage()
> );
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6553) Consumer consumed committed messages

2018-02-12 Thread Orel Shai (JIRA)
Orel Shai created KAFKA-6553:


 Summary: Consumer consumed committed messages
 Key: KAFKA-6553
 URL: https://issues.apache.org/jira/browse/KAFKA-6553
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.10.2.0
Reporter: Orel Shai


Hi,
We're using consumer kafka client 0.10.2.0 (that is working against Kafka 
broker 0.10.0) with the following configuration:
{code:java}
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 16 * 1024);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
RoundRobinAssignor.class.getName());
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "3");
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "4");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
{code}
So as you can see we're using autocommit.

The consumer API version that we're using has a dedicated thread for doing 
autocommit ,so every one second we have an autocommit which means that we have 
an heartbeat every one second.

For some reason we're getting the same message lots of times.

While looking at our logs I can see the following:
{code:java}
2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for 
partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-15 to the committed offset 
352878
2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for 
partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-3 to the committed offset 
352458
2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for 
partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-19 to the committed offset 
353775
2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for 
partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-23 to the committed offset 
352171
2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for 
partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-7 to the committed offset 
352995
2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for 
partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-27 to the committed offset 
352531
2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for 
partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-11 to the committed offset 
351893
2018-02-11 10:56:24,656 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched 
records for misc.ha.UpdateNodeGroup.UpdateNodeTopic-23 at offset 352171 since 
the current position is 352205
2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched 
records for misc.ha.UpdateNodeGroup.UpdateNodeTopic-11 at offset 351893 since 
the current position is 351929
2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched 
records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-26 since it is no 
longer fetchable
2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched 
records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-17 since it is no 
longer fetchable
2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched 
records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-29 since it is no 
longer fetchable
2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched 
records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-5 since it is no 
longer fetchable
2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched 
records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-8 since it is no 
longer fetchable
2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched 
records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-20 since it is no 
longer fetchable
2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched 
records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-2 since it is no 
longer fetchable
2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched 
records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-14 since it is no 
longer fetchable
{code}
Consumer connection log:
{code:java}
2018-02-12 08:18:13,506 DEBUG [DefaultThreadPool-9] Starting the Kafka consumer
2018-02-12 08:18:13,507 INFO [DefaultThreadPool-9] ConsumerConfig values: 
auto.commit.interval.ms = 1000
auto.offset.reset = latest
bootstrap.servers = [list of servers]
check.crcs = true
client.id = 2cd03a2b-f040-4f7f-b20c-ce3fe5efbe00
connections.max.idle.ms = 54
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800

[jira] [Commented] (KAFKA-6476) Document dynamic config update

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360908#comment-16360908
 ] 

ASF GitHub Bot commented on KAFKA-6476:
---

rajinisivaram opened a new pull request #4558: KAFKA-6476: Documentation for 
dynamic broker configuration
URL: https://github.com/apache/kafka/pull/4558
 
 
   Docs for dynamic broker configuration (KIP-226).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Document dynamic config update
> --
>
> Key: KAFKA-6476
> URL: https://issues.apache.org/jira/browse/KAFKA-6476
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core, documentation
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.2.0
>
>
> Add documentation for dynamic broker config update.
> Include:
>   - Command line options for kafka-configs.sh with examples
>   - Configs that can be updated along with constraints applied
>   - Secret rotation for password encoder
> Also add a new column for broker configs to indicate which configs can be  
> dynamically updated,



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6549) Deadlock while processing Controller Events

2018-02-12 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360879#comment-16360879
 ] 

Manikumar commented on KAFKA-6549:
--

Below are the sequence of steps which led to deadlock situation. This is rare 
scenario. Will try to come up with the solution.
 # Both controller ZK node deletion and ZK Session expiration events happens in 
a quick succession.
 # For controller ZK node deletion, ControllerChangeHandler.handleDeletion() is 
called. Controller.Reelect event is added to queue and Reelect process is 
initiated.
 # For ZK Session expiration, ZooKeeperClientWatcher takes WriteLock and calls 
Controller StateChangeHandler.beforeInitializingSession(). 
beforeInitializingSession method adds expire event to controller queue and 
waits for expire event completion.
 # Controller.Reelect waits for ZooKeeperClient.ReadLock
 # ZooKeeperClientWatcher waits for Expire event completion, which intern waits 
inside controller queue.

> Deadlock while processing Controller Events
> ---
>
> Key: KAFKA-6549
> URL: https://issues.apache.org/jira/browse/KAFKA-6549
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Blocker
> Fix For: 1.1.0
>
> Attachments: td.txt
>
>
> Stack traces from a single node test cluster that was deadlocked while 
> processing controller Reelect and Expire events. Attached stack-trace.
> {quote}
> "main-EventThread" #18 daemon prio=5 os_prio=31 tid=0x7f83e4285800 
> nid=0x7d03 waiting on condition [0x7278b000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0007bccadf30> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessed(KafkaController.scala:1505)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:163)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply$mcV$sp(ZooKeeperClient.scala:365)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>  at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:363)
>  at 
> org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531)
>  at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506)
> Locked ownable synchronizers:
>  - <0x000780054860> (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
>  
> "controller-event-thread" #42 prio=5 os_prio=31 tid=0x7f83e4293800 
> nid=0xad03 waiting on condition [0x73fd3000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0007bcc584a0> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:148)
>  at 
>