[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1

2018-03-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415012#comment-16415012
 ] 

ASF GitHub Bot commented on KAFKA-6054:
---

mjsax closed pull request #4746: KAFKA-6054: Fix upgrade path from Kafka 
Streams v0.10.0
URL: https://github.com/apache/kafka/pull/4746
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 1f5140b10c8..77123ff8093 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -73,28 +73,48 @@ do
   fi
 done
 
-for file in "$base_dir"/clients/build/libs/kafka-clients*.jar;
-do
-  if should_include_file "$file"; then
-CLASSPATH="$CLASSPATH":"$file"
-  fi
-done
+if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
+  clients_lib_dir=$(dirname $0)/../clients/build/libs
+  streams_lib_dir=$(dirname $0)/../streams/build/libs
+  rocksdb_lib_dir=$(dirname 
$0)/../streams/build/dependant-libs-${SCALA_VERSION}
+else
+  clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
+  streams_lib_dir=$clients_lib_dir
+  rocksdb_lib_dir=$streams_lib_dir
+fi
+
 
-for file in "$base_dir"/streams/build/libs/kafka-streams*.jar;
+for file in "$clients_lib_dir"/kafka-clients*.jar;
 do
   if should_include_file "$file"; then
 CLASSPATH="$CLASSPATH":"$file"
   fi
 done
 
-for file in 
"$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
+for file in "$streams_lib_dir"/kafka-streams*.jar;
 do
   if should_include_file "$file"; then
 CLASSPATH="$CLASSPATH":"$file"
   fi
 done
 
-for file in 
"$base_dir"/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar;
+if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
+  for file in 
"$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
+  do
+if should_include_file "$file"; then
+  CLASSPATH="$CLASSPATH":"$file"
+fi
+  done
+else
+  for file in 
"$base_dir"/streams/upgrade-system-tests-0100/build/libs/kafka-streams-upgrade-system-tests*.jar;
+  do
+if should_include_file "$file"; then
+  CLASSPATH="$CLASSPATH":"$file"
+fi
+  done
+fi
+
+for file in "$rocksdb_lib_dir"/rocksdb*.jar;
 do
   CLASSPATH="$CLASSPATH":"$file"
 done
diff --git a/build.gradle b/build.gradle
index d221d965c5e..2a540211018 100644
--- a/build.gradle
+++ b/build.gradle
@@ -776,6 +776,19 @@ project(':streams:examples') {
   }
 }
 
+project(':streams:upgrade-system-tests-0100') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-0100"
+
+  dependencies {
+testCompile libs.kafkaStreams_0100
+  }
+
+  systemTestLibs {
+dependsOn testJar
+  }
+}
+
+
 project(':log4j-appender') {
   archivesBaseName = "kafka-log4j-appender"
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 93b92bb52a3..dbbb9127199 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -918,7 +918,7 @@ public void onFailure(RuntimeException e) {
 log.error("Unexpected interrupt received in heartbeat thread 
for group {}", groupId, e);
 this.failed.set(new RuntimeException(e));
 } catch (RuntimeException e) {
-log.error("Heartbeat thread for group {} failed due to 
unexpected error" , groupId, e);
+log.error("Heartbeat thread for group {} failed due to 
unexpected error", groupId, e);
 this.failed.set(e);
 }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 212d701..74887483354 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -316,7 +316,7 @@ public int hashCode() {
 Field f = this.schema.get(i);
 if (f.type() instanceof ArrayOf) {
 if (this.get(f) != null) {
-Object[] arrayObject = (Object []) this.get(f);
+Object[] arrayObject = (Object[]) this.get(f);
 for (Object arrayItem: arrayObject)
 result = prime * result + arrayItem.hashCode();
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
 

[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1

2018-03-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414976#comment-16414976
 ] 

ASF GitHub Bot commented on KAFKA-6054:
---

mjsax opened a new pull request #4779: KAFKA-6054: Fix upgrade path from Kafka 
Streams v0.10.0
URL: https://github.com/apache/kafka/pull/4779
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when 
> upgrading from 0.10.0.0 to 0.10.2.1
> -
>
> Key: KAFKA-6054
> URL: https://issues.apache.org/jira/browse/KAFKA-6054
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: James Cheng
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: kip
> Fix For: 1.2.0
>
>
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade]
> We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling 
> upgrade of the app, so that one point, there were both 0.10.0.0-based 
> instances and 0.10.2.1-based instances running.
> We observed the following stack trace:
> {code:java}
> 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo 
> -
> unable to decode subscription data: version=2
> org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode
> subscription data: version=2
> at 
> org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113)
> at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
> 
> {code}
> I spoke with [~mjsax] and he said this is a known issue that happens when you 
> have both 0.10.0.0 instances and 

[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1

2018-03-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414965#comment-16414965
 ] 

ASF GitHub Bot commented on KAFKA-6054:
---

mjsax closed pull request #4761:  KAFKA-6054: Fix upgrade path from Kafka 
Streams v0.10.0
URL: https://github.com/apache/kafka/pull/4761
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index fe6aefd7321..8e2ba91bf2b 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -73,28 +73,50 @@ do
   fi
 done
 
-for file in "$base_dir"/clients/build/libs/kafka-clients*.jar;
-do
-  if should_include_file "$file"; then
-CLASSPATH="$CLASSPATH":"$file"
-  fi
-done
+if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
+  clients_lib_dir=$(dirname $0)/../clients/build/libs
+  streams_lib_dir=$(dirname $0)/../streams/build/libs
+  rocksdb_lib_dir=$(dirname 
$0)/../streams/build/dependant-libs-${SCALA_VERSION}
+else
+  clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
+  streams_lib_dir=$clients_lib_dir
+  rocksdb_lib_dir=$streams_lib_dir
+fi
+
 
-for file in "$base_dir"/streams/build/libs/kafka-streams*.jar;
+for file in "$clients_lib_dir"/kafka-clients*.jar;
 do
   if should_include_file "$file"; then
 CLASSPATH="$CLASSPATH":"$file"
   fi
 done
 
-for file in 
"$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
+for file in "$streams_lib_dir"/kafka-streams*.jar;
 do
   if should_include_file "$file"; then
 CLASSPATH="$CLASSPATH":"$file"
   fi
 done
 
-for file in 
"$base_dir"/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar;
+if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
+  for file in 
"$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
+  do
+if should_include_file "$file"; then
+  CLASSPATH="$CLASSPATH":"$file"
+fi
+  done
+else
+  VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'`
+  SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # 
remove last char, ie, bug-fix number
+  for file in 
"$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar;
+  do
+if should_include_file "$file"; then
+  CLASSPATH="$CLASSPATH":"$file"
+fi
+  done
+fi
+
+for file in "$rocksdb_lib_dir"/rocksdb*.jar;
 do
   CLASSPATH="$CLASSPATH":"$file"
 done
diff --git a/build.gradle b/build.gradle
index ce4b4e44cb2..17f3e00358d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -909,6 +909,42 @@ project(':streams:examples') {
   }
 }
 
+project(':streams:upgrade-system-tests-0100') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-0100"
+
+  dependencies {
+testCompile libs.kafkaStreams_0100
+  }
+
+  systemTestLibs {
+dependsOn testJar
+  }
+}
+
+project(':streams:upgrade-system-tests-0101') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-0101"
+
+  dependencies {
+testCompile libs.kafkaStreams_0101
+  }
+
+  systemTestLibs {
+dependsOn testJar
+  }
+}
+
+project(':streams:upgrade-system-tests-0102') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-0102"
+
+  dependencies {
+testCompile libs.kafkaStreams_0102
+  }
+
+  systemTestLibs {
+dependsOn testJar
+  }
+}
+
 project(':jmh-benchmarks') {
 
   apply plugin: 'com.github.johnrengelman.shadow'
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
index 7111bad6054..7102414628a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
@@ -16,7 +16,9 @@
  */
 package org.apache.kafka.common.security.authenticator;
 
-import java.util.Map;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.security.auth.AuthCallbackHandler;
 
 import javax.security.auth.Subject;
 import javax.security.auth.callback.Callback;
@@ -25,10 +27,7 @@
 import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.sasl.AuthorizeCallback;
 import javax.security.sasl.RealmCallback;
-
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.network.Mode;
-import org.apache.kafka.common.security.auth.AuthCallbackHandler;
+import java.util.Map;
 
 /**
  * Callback handler for Sasl clients. The callbacks required for the SASL 
mechanism
diff --git a/docs/streams/upgrade-guide.html 

[jira] [Commented] (KAFKA-6666) OffsetOutOfRangeException: Replica Thread Stopped Resulting in Underreplicated Partitions

2018-03-26 Thread Srinivas Dhruvakumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414951#comment-16414951
 ] 

Srinivas Dhruvakumar commented on KAFKA-:
-

[~huxi_2b] any update on the bug. Did we get a conclusion on the issue ? 

> OffsetOutOfRangeException: Replica Thread Stopped Resulting in 
> Underreplicated Partitions
> -
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.1
>Reporter: Srinivas Dhruvakumar
>Priority: Critical
> Attachments: Screen Shot 2018-03-15 at 3.52.13 PM.png
>
>
> Hello All, 
> Currently we were seeing a few underreplicated partitions on our test cluster 
> which is used for Intergation testing. On debugging more we found the replica 
> thread was stopped due to an error 
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 50 of partition  since it is larger 
> than the high watermark -1
> Kindly find the attached screenshot. 
> !Screen Shot 2018-03-15 at 3.52.13 PM.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1

2018-03-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414777#comment-16414777
 ] 

ASF GitHub Bot commented on KAFKA-6054:
---

mjsax closed pull request #4758: KAFKA-6054: Fix upgrade path from Kafka 
Streams v0.10.0
URL: https://github.com/apache/kafka/pull/4758
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index af10f61b5c4..a25868125e9 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -73,28 +73,50 @@ do
   fi
 done
 
-for file in "$base_dir"/clients/build/libs/kafka-clients*.jar;
-do
-  if should_include_file "$file"; then
-CLASSPATH="$CLASSPATH":"$file"
-  fi
-done
+if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
+  clients_lib_dir=$(dirname $0)/../clients/build/libs
+  streams_lib_dir=$(dirname $0)/../streams/build/libs
+  rocksdb_lib_dir=$(dirname 
$0)/../streams/build/dependant-libs-${SCALA_VERSION}
+else
+  clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
+  streams_lib_dir=$clients_lib_dir
+  rocksdb_lib_dir=$streams_lib_dir
+fi
+
 
-for file in "$base_dir"/streams/build/libs/kafka-streams*.jar;
+for file in "$clients_lib_dir"/kafka-clients*.jar;
 do
   if should_include_file "$file"; then
 CLASSPATH="$CLASSPATH":"$file"
   fi
 done
 
-for file in 
"$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
+for file in "$streams_lib_dir"/kafka-streams*.jar;
 do
   if should_include_file "$file"; then
 CLASSPATH="$CLASSPATH":"$file"
   fi
 done
 
-for file in 
"$base_dir"/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar;
+if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
+  for file in 
"$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
+  do
+if should_include_file "$file"; then
+  CLASSPATH="$CLASSPATH":"$file"
+fi
+  done
+else
+  VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'`
+  SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # 
remove last char, ie, bug-fix number
+  for file in 
"$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar;
+  do
+if should_include_file "$file"; then
+  CLASSPATH="$CLASSPATH":"$file"
+fi
+  done
+fi
+
+for file in "$rocksdb_lib_dir"/rocksdb*.jar;
 do
   CLASSPATH="$CLASSPATH":"$file"
 done
diff --git a/build.gradle b/build.gradle
index 20a184c437c..5e97f901cb6 100644
--- a/build.gradle
+++ b/build.gradle
@@ -770,6 +770,30 @@ project(':streams:examples') {
   }
 }
 
+project(':streams:upgrade-system-tests-0100') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-0100"
+
+  dependencies {
+testCompile libs.kafkaStreams_0100
+  }
+
+  systemTestLibs {
+dependsOn testJar
+  }
+}
+
+project(':streams:upgrade-system-tests-0101') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-0101"
+
+  dependencies {
+testCompile libs.kafkaStreams_0101
+  }
+
+  systemTestLibs {
+dependsOn testJar
+  }
+}
+
 project(':log4j-appender') {
   archivesBaseName = "kafka-log4j-appender"
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
index 6094b547bb7..b80dfccf3d9 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
@@ -17,7 +17,9 @@
  */
 package org.apache.kafka.common.security.authenticator;
 
-import java.util.Map;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.security.auth.AuthCallbackHandler;
 
 import javax.security.auth.Subject;
 import javax.security.auth.callback.Callback;
@@ -26,10 +28,7 @@
 import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.sasl.AuthorizeCallback;
 import javax.security.sasl.RealmCallback;
-
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.network.Mode;
-import org.apache.kafka.common.security.auth.AuthCallbackHandler;
+import java.util.Map;
 
 /**
  * Callback handler for Sasl clients. The callbacks required for the SASL 
mechanism
diff --git a/docs/streams.html b/docs/streams.html
index fe0e84ee3b7..d691e63a432 100644
--- a/docs/streams.html
+++ b/docs/streams.html
@@ -807,21 +807,50 @@ Upgrade Guid
 See below a complete list of 
0.10.2 API and semantical changes that allow you to advance your application 

[jira] [Commented] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file

2018-03-26 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414673#comment-16414673
 ] 

Cemalettin Koç commented on KAFKA-6711:
---

[~guozhang] Would you please check implementation please: 

[https://github.com/cemo/kafka/commit/0cb2482259fec897f396e8b84ffb1921c4f3f63e]

I have done something preliminary. I will add necessary tests as well after 
your guidance.

> GlobalStateManagerImpl should not write offsets of in-memory stores in 
> checkpoint file
> --
>
> Key: KAFKA-6711
> URL: https://issues.apache.org/jira/browse/KAFKA-6711
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Assignee: Cemalettin Koç
>Priority: Major
>  Labels: newbie
>
> We are using an InMemoryStore along with GlobalKTable and I noticed that 
> after each restart I am losing all my data. When I debug it, 
> `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains 
> offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl 
> implementation and noticed that it is not guarded for cases similar to mine. 
> I am fairly new to Kafka land and probably there might be another way to fix 
> issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6709) broker failed to handle request due to OOM

2018-03-26 Thread Dhruvil Shah (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414613#comment-16414613
 ] 

Dhruvil Shah commented on KAFKA-6709:
-

It's hard to estimate how much memory would be consumed by down-conversion on 
brokers - it is a factor of number of fetch requests being processed at a given 
point in time, the requested fetch size, amount of time it takes to create and 
send out the fetch response, and subsequently amount of time it takes for JVM 
to garbage collect the memory associated with responses that were sent out.

In general, memory usage would be much higher when brokers need to perform 
down-conversion. See here for more details - 
https://kafka.apache.org/0110/documentation.html#upgrade_11_message_format

Let me know if you'd be able to provide the heap dump which should help with 
analyzing inefficiencies in the existing scheme.

> broker failed to handle request due to OOM
> --
>
> Key: KAFKA-6709
> URL: https://issues.apache.org/jira/browse/KAFKA-6709
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.1
>Reporter: Zou Tao
>Priority: Critical
> Attachments: kafkaServer-gc.log.0.current.zip, kafkaServer.out.tgz, 
> normal-kafkaServer-gc.log.0.current.zip, server.properties
>
>
> I have updated to release 1.0.1.
> I set up cluster which have four brokers.
>  you could find the server.properties in the attachment.
>  There are about 150 topics, and about total 4000 partitions, 
> ReplicationFactor is 2.
>  connctors are used to write/read data to/from brokers.
>  connecotr version is 0.10.1.
>  The average message size is 500B, and around 6 messages per seconds.
>  one of the broker keep report OOM, and can't handle request like:
> [2018-03-24 12:37:17,449] ERROR [KafkaApi-1001] Error when handling request 
> {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=voltetraffica.data,partitions=[
> {partition=16,fetch_offset=51198,max_bytes=60728640}
> ,\{partition=12,fetch_offset=50984,max_bytes=60728640}]}]} 
> (kafka.server.KafkaApis)
>  java.lang.OutOfMemoryError: Java heap space
>      at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>      at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>      at 
> org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:101)
>      at 
> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253)
>      at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:525)
>      at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:523)
>      at scala.Option.map(Option.scala:146)
>      at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:523)
>      at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:513)
>      at scala.Option.flatMap(Option.scala:171)
>      at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:513)
>      at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:561)
>      at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:560)
>      at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>      at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>      at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>      at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>      at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:560)
>      at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:574)
>      at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:574)
>      at 
> kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2041)
>      at 
> kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:54)
>      at 
> kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2040)
>      at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:574)
>      at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:593)
>      at 
> kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:176)
>      at 
> 

[jira] [Commented] (KAFKA-6713) Provide an easy way replace store with a custom one on High-Level Streams DSL

2018-03-26 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414606#comment-16414606
 ] 

Guozhang Wang commented on KAFKA-6713:
--

Hi [~cemo], I've read your code, and here are some follow-up questions / 
clarifications:

1. StreamsBuilder.globalTable is expecting a {{Materialized>}} parameter and hence with `Materialized.as` one 
should only pass in a {{KeyValueBytesStoreSupplier}} that generates a 
{{KeyValueStore}}. So you do not need to template your 
{{DelegatingByteStore}} but just let it to use a `{{KeyValueStore delegated}} internally. Using a converter after the serde will 
unnecessarily calling serdes three times other than one: when a  pair is 
passed in, you woud first use the key/value serde to serialize it into bytes, 
and then deserialize it in your {{DelegatingByteStore}} implementation with the 
converter into  again, and then when with the in-memory  store it 
will once again serialize it into bytes before putting to cache.

2. In your code you are re-using the same {{DelegatingByteStore}} in your 
{{KeyValueBytesStoreSupplier}}, i.e. whenever `get()` is called it will always 
return the same store object. Is it intentional? Note that although it is fine 
for now since we will only call `get()` once across all threads for global 
store, this is an internal implementation detail that maybe changed. To be 
safer I'd suggest you generate a new object in your supplier per each `get()` 
call.

3. About your invalidation use cases, I'm not sure I can follow completely... 
could you elaborate a bit more?

> Provide an easy way replace store with a custom one on High-Level Streams DSL
> -
>
> Key: KAFKA-6713
> URL: https://issues.apache.org/jira/browse/KAFKA-6713
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Priority: Major
>  Labels: streaming-api
> Attachments: BytesTypeConverter.java, DelegatingByteStore.java, 
> TypeConverter.java
>
>
> I am trying to use GlobalKTable with a custom store implementation. In my 
> stores, I would like to store my `Category` entites and I would like to query 
> them by their name as well. My custom store has some capabilities beyond 
> `get` such as get by `name`. I also want to get all entries in a hierarchical 
> way in a lazy fashion. I have other use cases as well.
>  
> In order to accomplish my task I had to implement  a custom 
> `KeyValueBytesStoreSupplier`,  `BytesTypeConverter` and 
>  
> {code:java}
> public class DelegatingByteStore implements KeyValueStore byte[]> {
>   private BytesTypeConverter converter;
>   private KeyValueStore delegated;
>   public DelegatingByteStore(KeyValueStore delegated, 
> BytesTypeConverter converter) {
> this.converter = converter;
> this.delegated = delegated;
>   }
>   @Override
>   public void put(Bytes key, byte[] value) {
> delegated.put(converter.outerKey(key),
>   converter.outerValue(value));
>   }
>   @Override
>   public byte[] putIfAbsent(Bytes key, byte[] value) {
> V v = delegated.putIfAbsent(converter.outerKey(key),
> converter.outerValue(value));
> return v == null ? null : value;
>   }
>   ..
> {code}
>  
>  Type Converter:
> {code:java}
> public interface TypeConverter {
>   IK innerKey(final K key);
>   IV innerValue(final V value);
>   List> innerEntries(final List> from);
>   List> outerEntries(final List> from);
>   V outerValue(final IV value);
>   KeyValue outerKeyValue(final KeyValue from);
>   KeyValueinnerKeyValue(final KeyValue entry);
>   K outerKey(final IK ik);
> }
> {code}
>  
> This is unfortunately too cumbersome and hard to maintain.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file

2018-03-26 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-6711:


Assignee: Cemalettin Koç

> GlobalStateManagerImpl should not write offsets of in-memory stores in 
> checkpoint file
> --
>
> Key: KAFKA-6711
> URL: https://issues.apache.org/jira/browse/KAFKA-6711
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Assignee: Cemalettin Koç
>Priority: Major
>  Labels: newbie
>
> We are using an InMemoryStore along with GlobalKTable and I noticed that 
> after each restart I am losing all my data. When I debug it, 
> `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains 
> offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl 
> implementation and noticed that it is not guarded for cases similar to mine. 
> I am fairly new to Kafka land and probably there might be another way to fix 
> issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file

2018-03-26 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414569#comment-16414569
 ] 

Guozhang Wang commented on KAFKA-6711:
--

Great! I've assigned the ticket to you.

> GlobalStateManagerImpl should not write offsets of in-memory stores in 
> checkpoint file
> --
>
> Key: KAFKA-6711
> URL: https://issues.apache.org/jira/browse/KAFKA-6711
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Assignee: Cemalettin Koç
>Priority: Major
>  Labels: newbie
>
> We are using an InMemoryStore along with GlobalKTable and I noticed that 
> after each restart I am losing all my data. When I debug it, 
> `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains 
> offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl 
> implementation and noticed that it is not guarded for cases similar to mine. 
> I am fairly new to Kafka land and probably there might be another way to fix 
> issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file

2018-03-26 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414555#comment-16414555
 ] 

Cemalettin Koç commented on KAFKA-6711:
---

I will fix [~guozhang]. 

> GlobalStateManagerImpl should not write offsets of in-memory stores in 
> checkpoint file
> --
>
> Key: KAFKA-6711
> URL: https://issues.apache.org/jira/browse/KAFKA-6711
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Priority: Major
>  Labels: newbie
>
> We are using an InMemoryStore along with GlobalKTable and I noticed that 
> after each restart I am losing all my data. When I debug it, 
> `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains 
> offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl 
> implementation and noticed that it is not guarded for cases similar to mine. 
> I am fairly new to Kafka land and probably there might be another way to fix 
> issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6713) Provide an easy way replace store with a custom one on High-Level Streams DSL

2018-03-26 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414477#comment-16414477
 ] 

Cemalettin Koç commented on KAFKA-6713:
---

Today I came across another scenario which is still not very clear for me. :) 
For my use case I have chosen our "Category" entity since It is updated rarely 
and it is data cardinality is suitable for a newbie Kafka user. :) 

GlobalKTable is very nice since it is magically filling our in memory stores. 
However there are some cases I need to invalidate some computed data which is 
based whole data of store. 

Please forgive my ignorance since I have just started to use Kafka but I 
thought It would be nice to have something like KStream as GlobalKStream. :) I 
can process and trigger some invalidation in case a new updated category? 

> Provide an easy way replace store with a custom one on High-Level Streams DSL
> -
>
> Key: KAFKA-6713
> URL: https://issues.apache.org/jira/browse/KAFKA-6713
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Priority: Major
>  Labels: streaming-api
> Attachments: BytesTypeConverter.java, DelegatingByteStore.java, 
> TypeConverter.java
>
>
> I am trying to use GlobalKTable with a custom store implementation. In my 
> stores, I would like to store my `Category` entites and I would like to query 
> them by their name as well. My custom store has some capabilities beyond 
> `get` such as get by `name`. I also want to get all entries in a hierarchical 
> way in a lazy fashion. I have other use cases as well.
>  
> In order to accomplish my task I had to implement  a custom 
> `KeyValueBytesStoreSupplier`,  `BytesTypeConverter` and 
>  
> {code:java}
> public class DelegatingByteStore implements KeyValueStore byte[]> {
>   private BytesTypeConverter converter;
>   private KeyValueStore delegated;
>   public DelegatingByteStore(KeyValueStore delegated, 
> BytesTypeConverter converter) {
> this.converter = converter;
> this.delegated = delegated;
>   }
>   @Override
>   public void put(Bytes key, byte[] value) {
> delegated.put(converter.outerKey(key),
>   converter.outerValue(value));
>   }
>   @Override
>   public byte[] putIfAbsent(Bytes key, byte[] value) {
> V v = delegated.putIfAbsent(converter.outerKey(key),
> converter.outerValue(value));
> return v == null ? null : value;
>   }
>   ..
> {code}
>  
>  Type Converter:
> {code:java}
> public interface TypeConverter {
>   IK innerKey(final K key);
>   IV innerValue(final V value);
>   List> innerEntries(final List> from);
>   List> outerEntries(final List> from);
>   V outerValue(final IV value);
>   KeyValue outerKeyValue(final KeyValue from);
>   KeyValueinnerKeyValue(final KeyValue entry);
>   K outerKey(final IK ik);
> }
> {code}
>  
> This is unfortunately too cumbersome and hard to maintain.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5863) Potential null dereference in DistributedHerder#reconfigureConnector()

2018-03-26 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-5863:
--
Description: 
Here is the call chain:
{code}
RestServer.httpRequest(reconfigUrl, "POST", 
taskProps, null);
{code}
In httpRequest():
{code}
} else if (responseCode >= 200 && responseCode < 300) {
InputStream is = connection.getInputStream();
T result = JSON_SERDE.readValue(is, responseFormat);
{code}
For readValue():
{code}
public  T readValue(InputStream src, TypeReference valueTypeRef)
throws IOException, JsonParseException, JsonMappingException
{
return (T) _readMapAndClose(_jsonFactory.createParser(src), 
_typeFactory.constructType(valueTypeRef));
{code}
Then there would be NPE in constructType():
{code}
public JavaType constructType(TypeReference typeRef)
{
// 19-Oct-2015, tatu: Simpler variant like so should work
return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
{code}

  was:
Here is the call chain:

{code}
RestServer.httpRequest(reconfigUrl, "POST", 
taskProps, null);
{code}
In httpRequest():
{code}
} else if (responseCode >= 200 && responseCode < 300) {
InputStream is = connection.getInputStream();
T result = JSON_SERDE.readValue(is, responseFormat);
{code}
For readValue():
{code}
public  T readValue(InputStream src, TypeReference valueTypeRef)
throws IOException, JsonParseException, JsonMappingException
{
return (T) _readMapAndClose(_jsonFactory.createParser(src), 
_typeFactory.constructType(valueTypeRef));
{code}
Then there would be NPE in constructType():
{code}
public JavaType constructType(TypeReference typeRef)
{
// 19-Oct-2015, tatu: Simpler variant like so should work
return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
{code}


> Potential null dereference in DistributedHerder#reconfigureConnector()
> --
>
> Key: KAFKA-5863
> URL: https://issues.apache.org/jira/browse/KAFKA-5863
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Ted Yu
>Priority: Minor
>
> Here is the call chain:
> {code}
> RestServer.httpRequest(reconfigUrl, "POST", 
> taskProps, null);
> {code}
> In httpRequest():
> {code}
> } else if (responseCode >= 200 && responseCode < 300) {
> InputStream is = connection.getInputStream();
> T result = JSON_SERDE.readValue(is, responseFormat);
> {code}
> For readValue():
> {code}
> public  T readValue(InputStream src, TypeReference valueTypeRef)
> throws IOException, JsonParseException, JsonMappingException
> {
> return (T) _readMapAndClose(_jsonFactory.createParser(src), 
> _typeFactory.constructType(valueTypeRef));
> {code}
> Then there would be NPE in constructType():
> {code}
> public JavaType constructType(TypeReference typeRef)
> {
> // 19-Oct-2015, tatu: Simpler variant like so should work
> return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6713) Provide an easy way replace store with a custom one on High-Level Streams DSL

2018-03-26 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414469#comment-16414469
 ] 

Cemalettin Koç edited comment on KAFKA-6713 at 3/26/18 8:20 PM:


Hi [~guozhang], I have copied TypeConverter internal interface and added some 
other methods for my needs. What I want to use actually using a custom in 
memory store. Then I wanted to pass this `category in memory store` to my 
`category service`. 

 

 
{code:java}
class CategoryInMemoryStore extends InMemoryKeyValueStore { // 
implementation }
{code}
 

I have created an instance of `CategoryInMemoryStore` and passed into my 
StreamsBuilder as this:

 
{code:java}
public GlobalKTable categoryKGlobalTable(StreamsBuilder 
streamsBuilder) {
  KeyValueBytesStoreSupplier supplier =
  new DelegatingByteStore<>(categoryInMemoryStore, 
createConvertor()).asSupplier();
  return streamsBuilder.globalTable(categoryTopic,
Materialized.as(supplier)
.withCachingDisabled()
.withKeySerde(Serdes.Long())
.withValueSerde(CATEGORY_JSON_SERDE));
}
{code}
The whole point of the files I have attached is that creating a customized 
version of in memory key value store implementation. 

I have also attached my implementations which are used above. 

 

 


was (Author: cemo):
Hi [~guozhang], I have copied TypeConverter internal interface and added some 
other methods for my needs. What I want to use actually using a custom in 
memory store. Then I wanted to pass this `category in memory store` to my 
`category service`. 

 

 
{code:java}
class CategoryInMemoryStore extends InMemoryKeyValueStore { // 
implementation }
{code}
 

I have created an instance of `CategoryInMemoryStore` and passed into my 
StreamsBuilder as this:

 
{code:java}
public GlobalKTable categoryKGlobalTable(StreamsBuilder 
streamsBuilder) {
  KeyValueBytesStoreSupplier supplier =
  new DelegatingByteStore<>(categoryInMemoryStore, 
createConvertor()).asSupplier();
  return streamsBuilder.globalTable(categoryTopic,
Materialized.as(supplier)
.withCachingDisabled()
.withKeySerde(Serdes.Long())
.withValueSerde(CATEGORY_JSON_SERDE));
}
{code}
The whole point of the files I have attached to create a my in memory key value 
store implementation. 

I have also attached my implementations which are used above. 

 

 

> Provide an easy way replace store with a custom one on High-Level Streams DSL
> -
>
> Key: KAFKA-6713
> URL: https://issues.apache.org/jira/browse/KAFKA-6713
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Priority: Major
>  Labels: streaming-api
> Attachments: BytesTypeConverter.java, DelegatingByteStore.java, 
> TypeConverter.java
>
>
> I am trying to use GlobalKTable with a custom store implementation. In my 
> stores, I would like to store my `Category` entites and I would like to query 
> them by their name as well. My custom store has some capabilities beyond 
> `get` such as get by `name`. I also want to get all entries in a hierarchical 
> way in a lazy fashion. I have other use cases as well.
>  
> In order to accomplish my task I had to implement  a custom 
> `KeyValueBytesStoreSupplier`,  `BytesTypeConverter` and 
>  
> {code:java}
> public class DelegatingByteStore implements KeyValueStore byte[]> {
>   private BytesTypeConverter converter;
>   private KeyValueStore delegated;
>   public DelegatingByteStore(KeyValueStore delegated, 
> BytesTypeConverter converter) {
> this.converter = converter;
> this.delegated = delegated;
>   }
>   @Override
>   public void put(Bytes key, byte[] value) {
> delegated.put(converter.outerKey(key),
>   converter.outerValue(value));
>   }
>   @Override
>   public byte[] putIfAbsent(Bytes key, byte[] value) {
> V v = delegated.putIfAbsent(converter.outerKey(key),
> converter.outerValue(value));
> return v == null ? null : value;
>   }
>   ..
> {code}
>  
>  Type Converter:
> {code:java}
> public interface TypeConverter {
>   IK innerKey(final K key);
>   IV innerValue(final V value);
>   List> innerEntries(final List> from);
>   List> outerEntries(final List> from);
>   V 

[jira] [Updated] (KAFKA-6413) ReassignPartitionsCommand#parsePartitionReassignmentData() should give better error message when JSON is malformed

2018-03-26 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-6413:
--
Description: 
In this thread: 
http://search-hadoop.com/m/Kafka/uyzND1J9Hizcxo0X?subj=Partition+reassignment+data+file+is+empty
 , Allen gave an example JSON string with extra comma where 
partitionsToBeReassigned returned by 
ReassignPartitionsCommand#parsePartitionReassignmentData() was empty.

I tried the following example where a right bracket is removed:

{code}
val (partitionsToBeReassigned, replicaAssignment) = 
ReassignPartitionsCommand.parsePartitionReassignmentData(

"{\"version\":1,\"partitions\":[{\"topic\":\"metrics\",\"partition\":0,\"replicas\":[1,2]},{\"topic\":\"metrics\",\"partition\":1,\"replicas\":[2,3]},}");
{code}
The returned partitionsToBeReassigned is empty (and no exception was thrown).

The parser should give better error message for malformed JSON string.

  was:
In this thread: 
http://search-hadoop.com/m/Kafka/uyzND1J9Hizcxo0X?subj=Partition+reassignment+data+file+is+empty
 , Allen gave an example JSON string with extra comma where 
partitionsToBeReassigned returned by 
ReassignPartitionsCommand#parsePartitionReassignmentData() was empty.

I tried the following example where a right bracket is removed:
{code}
val (partitionsToBeReassigned, replicaAssignment) = 
ReassignPartitionsCommand.parsePartitionReassignmentData(

"{\"version\":1,\"partitions\":[{\"topic\":\"metrics\",\"partition\":0,\"replicas\":[1,2]},{\"topic\":\"metrics\",\"partition\":1,\"replicas\":[2,3]},}");
{code}
The returned partitionsToBeReassigned is empty (and no exception was thrown).

The parser should give better error message for malformed JSON string.


> ReassignPartitionsCommand#parsePartitionReassignmentData() should give better 
> error message when JSON is malformed
> --
>
> Key: KAFKA-6413
> URL: https://issues.apache.org/jira/browse/KAFKA-6413
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ted Yu
>Priority: Minor
>  Labels: json
>
> In this thread: 
> http://search-hadoop.com/m/Kafka/uyzND1J9Hizcxo0X?subj=Partition+reassignment+data+file+is+empty
>  , Allen gave an example JSON string with extra comma where 
> partitionsToBeReassigned returned by 
> ReassignPartitionsCommand#parsePartitionReassignmentData() was empty.
> I tried the following example where a right bracket is removed:
> {code}
> val (partitionsToBeReassigned, replicaAssignment) = 
> ReassignPartitionsCommand.parsePartitionReassignmentData(
> 
> "{\"version\":1,\"partitions\":[{\"topic\":\"metrics\",\"partition\":0,\"replicas\":[1,2]},{\"topic\":\"metrics\",\"partition\":1,\"replicas\":[2,3]},}");
> {code}
> The returned partitionsToBeReassigned is empty (and no exception was thrown).
> The parser should give better error message for malformed JSON string.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6713) Provide an easy way replace store with a custom one on High-Level Streams DSL

2018-03-26 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414469#comment-16414469
 ] 

Cemalettin Koç commented on KAFKA-6713:
---

Hi [~guozhang], I have copied TypeConverter internal interface and added some 
other methods for my needs. What I want to use actually using a custom in 
memory store. Then I wanted to pass this `category in memory store` to my 
`category service`. 

 

 
{code:java}
class CategoryInMemoryStore extends InMemoryKeyValueStore { // 
implementation }
{code}
 

I have created an instance of `CategoryInMemoryStore` and passed into my 
StreamsBuilder as this:

 
{code:java}
public GlobalKTable categoryKGlobalTable(StreamsBuilder 
streamsBuilder) {
  KeyValueBytesStoreSupplier supplier =
  new DelegatingByteStore<>(categoryInMemoryStore, 
createConvertor()).asSupplier();
  return streamsBuilder.globalTable(categoryTopic,
Materialized.as(supplier)
.withCachingDisabled()
.withKeySerde(Serdes.Long())
.withValueSerde(CATEGORY_JSON_SERDE));
}
{code}
The whole point of the files I have attached to create a my in memory key value 
store implementation. 

I have also attached my implementations which are used above. 

 

 

> Provide an easy way replace store with a custom one on High-Level Streams DSL
> -
>
> Key: KAFKA-6713
> URL: https://issues.apache.org/jira/browse/KAFKA-6713
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Priority: Major
>  Labels: streaming-api
> Attachments: BytesTypeConverter.java, DelegatingByteStore.java, 
> TypeConverter.java
>
>
> I am trying to use GlobalKTable with a custom store implementation. In my 
> stores, I would like to store my `Category` entites and I would like to query 
> them by their name as well. My custom store has some capabilities beyond 
> `get` such as get by `name`. I also want to get all entries in a hierarchical 
> way in a lazy fashion. I have other use cases as well.
>  
> In order to accomplish my task I had to implement  a custom 
> `KeyValueBytesStoreSupplier`,  `BytesTypeConverter` and 
>  
> {code:java}
> public class DelegatingByteStore implements KeyValueStore byte[]> {
>   private BytesTypeConverter converter;
>   private KeyValueStore delegated;
>   public DelegatingByteStore(KeyValueStore delegated, 
> BytesTypeConverter converter) {
> this.converter = converter;
> this.delegated = delegated;
>   }
>   @Override
>   public void put(Bytes key, byte[] value) {
> delegated.put(converter.outerKey(key),
>   converter.outerValue(value));
>   }
>   @Override
>   public byte[] putIfAbsent(Bytes key, byte[] value) {
> V v = delegated.putIfAbsent(converter.outerKey(key),
> converter.outerValue(value));
> return v == null ? null : value;
>   }
>   ..
> {code}
>  
>  Type Converter:
> {code:java}
> public interface TypeConverter {
>   IK innerKey(final K key);
>   IV innerValue(final V value);
>   List> innerEntries(final List> from);
>   List> outerEntries(final List> from);
>   V outerValue(final IV value);
>   KeyValue outerKeyValue(final KeyValue from);
>   KeyValueinnerKeyValue(final KeyValue entry);
>   K outerKey(final IK ik);
> }
> {code}
>  
> This is unfortunately too cumbersome and hard to maintain.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6713) Provide an easy way replace store with a custom one on High-Level Streams DSL

2018-03-26 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cemalettin Koç updated KAFKA-6713:
--
Attachment: TypeConverter.java
DelegatingByteStore.java
BytesTypeConverter.java

> Provide an easy way replace store with a custom one on High-Level Streams DSL
> -
>
> Key: KAFKA-6713
> URL: https://issues.apache.org/jira/browse/KAFKA-6713
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Priority: Major
>  Labels: streaming-api
> Attachments: BytesTypeConverter.java, DelegatingByteStore.java, 
> TypeConverter.java
>
>
> I am trying to use GlobalKTable with a custom store implementation. In my 
> stores, I would like to store my `Category` entites and I would like to query 
> them by their name as well. My custom store has some capabilities beyond 
> `get` such as get by `name`. I also want to get all entries in a hierarchical 
> way in a lazy fashion. I have other use cases as well.
>  
> In order to accomplish my task I had to implement  a custom 
> `KeyValueBytesStoreSupplier`,  `BytesTypeConverter` and 
>  
> {code:java}
> public class DelegatingByteStore implements KeyValueStore byte[]> {
>   private BytesTypeConverter converter;
>   private KeyValueStore delegated;
>   public DelegatingByteStore(KeyValueStore delegated, 
> BytesTypeConverter converter) {
> this.converter = converter;
> this.delegated = delegated;
>   }
>   @Override
>   public void put(Bytes key, byte[] value) {
> delegated.put(converter.outerKey(key),
>   converter.outerValue(value));
>   }
>   @Override
>   public byte[] putIfAbsent(Bytes key, byte[] value) {
> V v = delegated.putIfAbsent(converter.outerKey(key),
> converter.outerValue(value));
> return v == null ? null : value;
>   }
>   ..
> {code}
>  
>  Type Converter:
> {code:java}
> public interface TypeConverter {
>   IK innerKey(final K key);
>   IV innerValue(final V value);
>   List> innerEntries(final List> from);
>   List> outerEntries(final List> from);
>   V outerValue(final IV value);
>   KeyValue outerKeyValue(final KeyValue from);
>   KeyValueinnerKeyValue(final KeyValue entry);
>   K outerKey(final IK ik);
> }
> {code}
>  
> This is unfortunately too cumbersome and hard to maintain.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6713) Provide an easy way replace store with a custom one on High-Level Streams DSL

2018-03-26 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414323#comment-16414323
 ] 

Guozhang Wang commented on KAFKA-6713:
--

Hi [~cemo] Thanks for reporting this, is very valuable to us. What's puzzles me 
is why you'd need to implement the {{TypeConverter}} interface, as it is an 
internal interface and is not supposed to be enforced to users. Could you share 
more of your code snippet to help me understand better the cumbersomeness. Note 
that if you are using DSL and trying to implement the customized store, you 
should only need to implement the {{KeyValueBytesStoreSupplier}}, and the 
{{KeyValueStore}} it generates. The serdes will be auto-handled 
by the streams library.

> Provide an easy way replace store with a custom one on High-Level Streams DSL
> -
>
> Key: KAFKA-6713
> URL: https://issues.apache.org/jira/browse/KAFKA-6713
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Priority: Major
>  Labels: streaming-api
>
> I am trying to use GlobalKTable with a custom store implementation. In my 
> stores, I would like to store my `Category` entites and I would like to query 
> them by their name as well. My custom store has some capabilities beyond 
> `get` such as get by `name`. I also want to get all entries in a hierarchical 
> way in a lazy fashion. I have other use cases as well.
>  
> In order to accomplish my task I had to implement  a custom 
> `KeyValueBytesStoreSupplier`,  `BytesTypeConverter` and 
>  
> {code:java}
> public class DelegatingByteStore implements KeyValueStore byte[]> {
>   private BytesTypeConverter converter;
>   private KeyValueStore delegated;
>   public DelegatingByteStore(KeyValueStore delegated, 
> BytesTypeConverter converter) {
> this.converter = converter;
> this.delegated = delegated;
>   }
>   @Override
>   public void put(Bytes key, byte[] value) {
> delegated.put(converter.outerKey(key),
>   converter.outerValue(value));
>   }
>   @Override
>   public byte[] putIfAbsent(Bytes key, byte[] value) {
> V v = delegated.putIfAbsent(converter.outerKey(key),
> converter.outerValue(value));
> return v == null ? null : value;
>   }
>   ..
> {code}
>  
>  Type Converter:
> {code:java}
> public interface TypeConverter {
>   IK innerKey(final K key);
>   IV innerValue(final V value);
>   List> innerEntries(final List> from);
>   List> outerEntries(final List> from);
>   V outerValue(final IV value);
>   KeyValue outerKeyValue(final KeyValue from);
>   KeyValueinnerKeyValue(final KeyValue entry);
>   K outerKey(final IK ik);
> }
> {code}
>  
> This is unfortunately too cumbersome and hard to maintain.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file

2018-03-26 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6711:
-
Summary: GlobalStateManagerImpl should not write offsets of in-memory 
stores in checkpoint file  (was: Checkpoint of InMemoryStore along with 
GlobalKTable )

> GlobalStateManagerImpl should not write offsets of in-memory stores in 
> checkpoint file
> --
>
> Key: KAFKA-6711
> URL: https://issues.apache.org/jira/browse/KAFKA-6711
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Priority: Major
>  Labels: newbie
>
> We are using an InMemoryStore along with GlobalKTable and I noticed that 
> after each restart I am losing all my data. When I debug it, 
> `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains 
> offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl 
> implementation and noticed that it is not guarded for cases similar to mine. 
> I am fairly new to Kafka land and probably there might be another way to fix 
> issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6711) Checkpoint of InMemoryStore along with GlobalKTable

2018-03-26 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6711:
-
Labels: newbie  (was: )

> Checkpoint of InMemoryStore along with GlobalKTable 
> 
>
> Key: KAFKA-6711
> URL: https://issues.apache.org/jira/browse/KAFKA-6711
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Priority: Major
>  Labels: newbie
>
> We are using an InMemoryStore along with GlobalKTable and I noticed that 
> after each restart I am losing all my data. When I debug it, 
> `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains 
> offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl 
> implementation and noticed that it is not guarded for cases similar to mine. 
> I am fairly new to Kafka land and probably there might be another way to fix 
> issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6711) Checkpoint of InMemoryStore along with GlobalKTable

2018-03-26 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414308#comment-16414308
 ] 

Guozhang Wang commented on KAFKA-6711:
--

Hello [~cemo], it is by-design that in-memory state stores will not persist any 
data to the local storage engine upon closing, and hence when restarting it 
will lose all the data and hence will need to re-bootstrap from the topic 
again. For normal state stores we are having the check that {{if 
(store.persistent() && storeToChangelogTopic.containsKey(storeName)) // write 
checkpoint values}}, but I agree with you that in global store implementation, 
i.e. {{GlobalStateManagerImpl}} we do not have this check, which I think is a 
bug. I.e. we should do the similar check and hence do not write offsets in the 
checkpoint file. Would you like to fix this issue?

As for your own observed issue, we do have a JIRA open for adding the 
checkpoint feature for in-memory stores so that data will not be lost upon 
closing: https://issues.apache.org/jira/browse/KAFKA-3184.

> Checkpoint of InMemoryStore along with GlobalKTable 
> 
>
> Key: KAFKA-6711
> URL: https://issues.apache.org/jira/browse/KAFKA-6711
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Priority: Major
>
> We are using an InMemoryStore along with GlobalKTable and I noticed that 
> after each restart I am losing all my data. When I debug it, 
> `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains 
> offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl 
> implementation and noticed that it is not guarded for cases similar to mine. 
> I am fairly new to Kafka land and probably there might be another way to fix 
> issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6716) discardChannel should be released in MockSelector#completeSend

2018-03-26 Thread Ted Yu (JIRA)
Ted Yu created KAFKA-6716:
-

 Summary: discardChannel should be released in 
MockSelector#completeSend
 Key: KAFKA-6716
 URL: https://issues.apache.org/jira/browse/KAFKA-6716
 Project: Kafka
  Issue Type: Test
Reporter: Ted Yu


{code}
private void completeSend(Send send) throws IOException {
// Consume the send so that we will be able to send more requests to 
the destination
ByteBufferChannel discardChannel = new ByteBufferChannel(send.size());
while (!send.completed()) {
send.writeTo(discardChannel);
}
completedSends.add(send);
}
{code}
The {{discardChannel}} should be closed before returning from the method



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6712) Throw a specific exception with wrong topic name for interactive queries

2018-03-26 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6712.

Resolution: Duplicate

> Throw a specific exception with wrong topic name for interactive queries
> 
>
> Key: KAFKA-6712
> URL: https://issues.apache.org/jira/browse/KAFKA-6712
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Francesco Guardiani
>Priority: Major
>
> When you use the interactive queries state stores with a wrong topic name and 
> you call the store() method, the client should throw an exception that 
> explains that you have specified a wrong topic name. Now it throws an 
> IllegalStateStoreException "the state store may have migrated to another 
> instance." that is too generic and it's used also when the stream thread is 
> not ready.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6712) Throw a specific exception with wrong topic name for interactive queries

2018-03-26 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16413129#comment-16413129
 ] 

Matthias J. Sax edited comment on KAFKA-6712 at 3/26/18 5:30 PM:
-

Thanks for opening a ticket for this [~slinkydeveloper]. I am wondering if it 
is a duplicate of KAFKA-5876 though?


was (Author: mjsax):
Thanks for opening a ticket for this [~slinkydeveloper]. I am wondering if is 
is a duplicate of KAFKA-5876 though?

> Throw a specific exception with wrong topic name for interactive queries
> 
>
> Key: KAFKA-6712
> URL: https://issues.apache.org/jira/browse/KAFKA-6712
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Francesco Guardiani
>Priority: Major
>
> When you use the interactive queries state stores with a wrong topic name and 
> you call the store() method, the client should throw an exception that 
> explains that you have specified a wrong topic name. Now it throws an 
> IllegalStateStoreException "the state store may have migrated to another 
> instance." that is too generic and it's used also when the stream thread is 
> not ready.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6678) Upgrade dependencies with later release versions

2018-03-26 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-6678:
--
Description: 
{code}
The following dependencies have later release versions:
 - net.sourceforge.argparse4j:argparse4j [0.7.0 -> 0.8.1]
 - org.bouncycastle:bcpkix-jdk15on [1.58 -> 1.59]
 - com.puppycrawl.tools:checkstyle [6.19 -> 8.8]
 - org.owasp:dependency-check-gradle [3.0.2 -> 3.1.1]
 - org.ajoberstar:grgit [1.9.3 -> 2.1.1]
 - org.glassfish.jersey.containers:jersey-container-servlet [2.25.1 -> 2.26]
 - org.eclipse.jetty:jetty-client [9.2.24.v20180105 -> 9.4.8.v20171121]
 - org.eclipse.jetty:jetty-server [9.2.24.v20180105 -> 9.4.8.v20171121]
 - org.eclipse.jetty:jetty-servlet [9.2.24.v20180105 -> 9.4.8.v20171121]
 - org.eclipse.jetty:jetty-servlets [9.2.24.v20180105 -> 9.4.8.v20171121]
 - org.openjdk.jmh:jmh-core [1.19 -> 1.20]
 - org.openjdk.jmh:jmh-core-benchmarks [1.19 -> 1.20]
 - org.openjdk.jmh:jmh-generator-annprocess [1.19 -> 1.20]
 - org.lz4:lz4-java [1.4 -> 1.4.1]
 - org.apache.maven:maven-artifact [3.5.2 -> 3.5.3]
 - org.jacoco:org.jacoco.agent [0.7.9 -> 0.8.0]
 - org.jacoco:org.jacoco.ant [0.7.9 -> 0.8.0]
 - org.rocksdb:rocksdbjni [5.7.3 -> 5.11.3]
 - org.scala-lang:scala-library [2.11.12 -> 2.12.4]
 - com.typesafe.scala-logging:scala-logging_2.11 [3.7.2 -> 3.8.0]
 - org.scala-lang:scala-reflect [2.11.12 -> 2.12.4]
 - org.scalatest:scalatest_2.11 [3.0.4 -> 3.0.5]
{code}

Looks like we can consider upgrading scalatest, jmh-core and checkstyle

  was:
{code}
The following dependencies have later release versions:
 - net.sourceforge.argparse4j:argparse4j [0.7.0 -> 0.8.1]
 - org.bouncycastle:bcpkix-jdk15on [1.58 -> 1.59]
 - com.puppycrawl.tools:checkstyle [6.19 -> 8.8]
 - org.owasp:dependency-check-gradle [3.0.2 -> 3.1.1]
 - org.ajoberstar:grgit [1.9.3 -> 2.1.1]
 - org.glassfish.jersey.containers:jersey-container-servlet [2.25.1 -> 2.26]
 - org.eclipse.jetty:jetty-client [9.2.24.v20180105 -> 9.4.8.v20171121]
 - org.eclipse.jetty:jetty-server [9.2.24.v20180105 -> 9.4.8.v20171121]
 - org.eclipse.jetty:jetty-servlet [9.2.24.v20180105 -> 9.4.8.v20171121]
 - org.eclipse.jetty:jetty-servlets [9.2.24.v20180105 -> 9.4.8.v20171121]
 - org.openjdk.jmh:jmh-core [1.19 -> 1.20]
 - org.openjdk.jmh:jmh-core-benchmarks [1.19 -> 1.20]
 - org.openjdk.jmh:jmh-generator-annprocess [1.19 -> 1.20]
 - org.lz4:lz4-java [1.4 -> 1.4.1]
 - org.apache.maven:maven-artifact [3.5.2 -> 3.5.3]
 - org.jacoco:org.jacoco.agent [0.7.9 -> 0.8.0]
 - org.jacoco:org.jacoco.ant [0.7.9 -> 0.8.0]
 - org.rocksdb:rocksdbjni [5.7.3 -> 5.11.3]
 - org.scala-lang:scala-library [2.11.12 -> 2.12.4]
 - com.typesafe.scala-logging:scala-logging_2.11 [3.7.2 -> 3.8.0]
 - org.scala-lang:scala-reflect [2.11.12 -> 2.12.4]
 - org.scalatest:scalatest_2.11 [3.0.4 -> 3.0.5]
{code}
Looks like we can consider upgrading scalatest, jmh-core and checkstyle


> Upgrade dependencies with later release versions
> 
>
> Key: KAFKA-6678
> URL: https://issues.apache.org/jira/browse/KAFKA-6678
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ted Yu
>Priority: Major
> Attachments: k-update.txt
>
>
> {code}
> The following dependencies have later release versions:
>  - net.sourceforge.argparse4j:argparse4j [0.7.0 -> 0.8.1]
>  - org.bouncycastle:bcpkix-jdk15on [1.58 -> 1.59]
>  - com.puppycrawl.tools:checkstyle [6.19 -> 8.8]
>  - org.owasp:dependency-check-gradle [3.0.2 -> 3.1.1]
>  - org.ajoberstar:grgit [1.9.3 -> 2.1.1]
>  - org.glassfish.jersey.containers:jersey-container-servlet [2.25.1 -> 2.26]
>  - org.eclipse.jetty:jetty-client [9.2.24.v20180105 -> 9.4.8.v20171121]
>  - org.eclipse.jetty:jetty-server [9.2.24.v20180105 -> 9.4.8.v20171121]
>  - org.eclipse.jetty:jetty-servlet [9.2.24.v20180105 -> 9.4.8.v20171121]
>  - org.eclipse.jetty:jetty-servlets [9.2.24.v20180105 -> 9.4.8.v20171121]
>  - org.openjdk.jmh:jmh-core [1.19 -> 1.20]
>  - org.openjdk.jmh:jmh-core-benchmarks [1.19 -> 1.20]
>  - org.openjdk.jmh:jmh-generator-annprocess [1.19 -> 1.20]
>  - org.lz4:lz4-java [1.4 -> 1.4.1]
>  - org.apache.maven:maven-artifact [3.5.2 -> 3.5.3]
>  - org.jacoco:org.jacoco.agent [0.7.9 -> 0.8.0]
>  - org.jacoco:org.jacoco.ant [0.7.9 -> 0.8.0]
>  - org.rocksdb:rocksdbjni [5.7.3 -> 5.11.3]
>  - org.scala-lang:scala-library [2.11.12 -> 2.12.4]
>  - com.typesafe.scala-logging:scala-logging_2.11 [3.7.2 -> 3.8.0]
>  - org.scala-lang:scala-reflect [2.11.12 -> 2.12.4]
>  - org.scalatest:scalatest_2.11 [3.0.4 -> 3.0.5]
> {code}
> Looks like we can consider upgrading scalatest, jmh-core and checkstyle



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6531) SocketServerTest#closingChannelException fails sometimes

2018-03-26 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-6531:
--
Component/s: core

> SocketServerTest#closingChannelException fails sometimes
> 
>
> Key: KAFKA-6531
> URL: https://issues.apache.org/jira/browse/KAFKA-6531
> Project: Kafka
>  Issue Type: Test
>  Components: core
>Reporter: Ted Yu
>Priority: Minor
>
> From 
> https://builds.apache.org/job/kafka-trunk-jdk9/361/testReport/junit/kafka.network/SocketServerTest/closingChannelException/
>  :
> {code}
> java.lang.AssertionError: Channels not removed
>   at kafka.utils.TestUtils$.fail(TestUtils.scala:355)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865)
>   at 
> kafka.network.SocketServerTest.assertProcessorHealthy(SocketServerTest.scala:914)
>   at 
> kafka.network.SocketServerTest.$anonfun$closingChannelException$1(SocketServerTest.scala:763)
>   at 
> kafka.network.SocketServerTest.$anonfun$closingChannelException$1$adapted(SocketServerTest.scala:747)
> {code}
> Among the test output, I saw:
> {code}
> [2018-02-04 18:51:15,995] ERROR Processor 0 closed connection from 
> /127.0.0.1:48261 (kafka.network.SocketServerTest$$anon$5$$anon$1:73)
> java.lang.IllegalStateException: There is already a connection for id 
> 127.0.0.1:1-127.0.0.1:2-0
>   at 
> org.apache.kafka.common.network.Selector.ensureNotRegistered(Selector.java:260)
>   at org.apache.kafka.common.network.Selector.register(Selector.java:254)
>   at 
> kafka.network.SocketServerTest$TestableSelector.super$register(SocketServerTest.scala:1043)
>   at 
> kafka.network.SocketServerTest$TestableSelector.$anonfun$register$2(SocketServerTest.scala:1043)
>   at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>   at 
> kafka.network.SocketServerTest$TestableSelector.runOp(SocketServerTest.scala:1037)
>   at 
> kafka.network.SocketServerTest$TestableSelector.register(SocketServerTest.scala:1043)
>   at 
> kafka.network.Processor.configureNewConnections(SocketServer.scala:723)
>   at kafka.network.Processor.run(SocketServer.scala:532)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5943) Reduce dependency on mock in connector tests

2018-03-26 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-5943:
--
Description: 
Currently connector tests make heavy use of mock (easymock, power mock).

This may hide the real logic behind operations and makes finding bugs difficult.


We should reduce the use of mocks so that developers can debug connector code 
using unit tests.
This would shorten the development cycle for connector.

  was:
Currently connector tests make heavy use of mock (easymock, power mock).

This may hide the real logic behind operations and makes finding bugs difficult.

We should reduce the use of mocks so that developers can debug connector code 
using unit tests.
This would shorten the development cycle for connector.


> Reduce dependency on mock in connector tests
> 
>
> Key: KAFKA-5943
> URL: https://issues.apache.org/jira/browse/KAFKA-5943
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Priority: Minor
>  Labels: connector
>
> Currently connector tests make heavy use of mock (easymock, power mock).
> This may hide the real logic behind operations and makes finding bugs 
> difficult.
> We should reduce the use of mocks so that developers can debug connector code 
> using unit tests.
> This would shorten the development cycle for connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5946) Give connector method parameter better name

2018-03-26 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392411#comment-16392411
 ] 

Ted Yu edited comment on KAFKA-5946 at 3/26/18 4:42 PM:


Thanks for taking it.


was (Author: yuzhih...@gmail.com):
Thanks for taking it .

> Give connector method parameter better name
> ---
>
> Key: KAFKA-5946
> URL: https://issues.apache.org/jira/browse/KAFKA-5946
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: Tanvi Jaywant
>Priority: Major
>  Labels: connector, newbie
>
> During the development of KAFKA-5657, there were several iterations where 
> method call didn't match what the connector parameter actually represents.
> [~ewencp] had used connType as equivalent to connClass because Type wasn't 
> used to differentiate source vs sink.
> [~ewencp] proposed the following:
> {code}
> It would help to convert all the uses of connType to connClass first, then 
> standardize on class == java class, type == source/sink, name == 
> user-specified name.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6303) Potential lack of synchronization in NioEchoServer#AcceptorThread

2018-03-26 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16333251#comment-16333251
 ] 

Ted Yu edited comment on KAFKA-6303 at 3/26/18 4:41 PM:


lgtm


was (Author: yuzhih...@gmail.com):
+1

> Potential lack of synchronization in NioEchoServer#AcceptorThread
> -
>
> Key: KAFKA-6303
> URL: https://issues.apache.org/jira/browse/KAFKA-6303
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Reporter: Ted Yu
>Assignee: siva santhalingam
>Priority: Minor
>
> In the run() method:
> {code}
> SocketChannel socketChannel = 
> ((ServerSocketChannel) key.channel()).accept();
> socketChannel.configureBlocking(false);
> newChannels.add(socketChannel);
> {code}
> Modification to newChannels should be protected by synchronized block.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5413) Log cleaner fails due to large offset in segment file

2018-03-26 Thread Andrew Olson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16413904#comment-16413904
 ] 

Andrew Olson commented on KAFKA-5413:
-

{quote}Is this the same issue as was reported here?{quote}

It sounds like a related issue, that may be more rare than what the correction 
for this one addressed, or a variant that eluded the bug fix here. The stack 
trace looks the same.

> Log cleaner fails due to large offset in segment file
> -
>
> Key: KAFKA-5413
> URL: https://issues.apache.org/jira/browse/KAFKA-5413
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.0, 0.10.2.1
> Environment: Ubuntu 14.04 LTS, Oracle Java 8u92, kafka_2.11-0.10.2.0
>Reporter: Nicholas Ngorok
>Assignee: Kelvin Rutt
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.2.2, 0.11.0.0
>
> Attachments: .index.cleaned, 
> .log, .log.cleaned, 
> .timeindex.cleaned, 002147422683.log, 
> kafka-5413.patch
>
>
> The log cleaner thread in our brokers is failing with the trace below
> {noformat}
> [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: 
> Cleaning segment 0 in log __consumer_offsets-12 (largest timestamp Thu Jun 08 
> 15:48:59 PDT 2017) into 0, retaining deletes. (kafka.log.LogCleaner)
> [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: 
> Cleaning segment 2147343575 in log __consumer_offsets-12 (largest timestamp 
> Thu Jun 08 15:49:06 PDT 2017) into 0, retaining deletes. 
> (kafka.log.LogCleaner)
> [2017-06-08 15:49:54,834] ERROR {kafka-log-cleaner-thread-0} 
> [kafka-log-cleaner-thread-0], Error due to  (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: largest offset in 
> message set can not be safely converted to relative offset.
> at scala.Predef$.require(Predef.scala:224)
> at kafka.log.LogSegment.append(LogSegment.scala:109)
> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:478)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:405)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:401)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:362)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.log.Cleaner.clean(LogCleaner.scala:362)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:241)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:220)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2017-06-08 15:49:54,835] INFO {kafka-log-cleaner-thread-0} 
> [kafka-log-cleaner-thread-0], Stopped  (kafka.log.LogCleaner)
> {noformat}
> This seems to point at the specific line [here| 
> https://github.com/apache/kafka/blob/0.11.0/core/src/main/scala/kafka/log/LogSegment.scala#L92]
>  in the kafka src where the difference is actually larger than MAXINT as both 
> baseOffset and offset are of type long. It was introduced in this [pr| 
> https://github.com/apache/kafka/pull/2210/files/56d1f8196b77a47b176b7bbd1e4220a3be827631]
> These were the outputs of dumping the first two log segments
> {noformat}
> :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration 
> --files /kafka-logs/__consumer_offsets-12/000
> 0.log
> Dumping /kafka-logs/__consumer_offsets-12/.log
> Starting offset: 0
> offset: 1810054758 position: 0 NoTimestampType: -1 isvalid: true payloadsize: 
> -1 magic: 0 compresscodec: NONE crc: 3127861909 keysize: 34
> :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration 
> --files /kafka-logs/__consumer_offsets-12/000
> 0002147343575.log
> Dumping /kafka-logs/__consumer_offsets-12/002147343575.log
> Starting offset: 2147343575
> offset: 2147539884 position: 0 NoTimestampType: -1 isvalid: true paylo
> adsize: -1 magic: 0 compresscodec: NONE crc: 2282192097 keysize: 34
> {noformat}
> My guess is that since 2147539884 is larger than MAXINT, we are hitting this 
> exception. Was there a specific reason, this check was added in 0.10.2?
> E.g. if the first offset is a key = "key 0" and then we have MAXINT + 1 of 
> "key 1" following, wouldn't we run into this situation whenever the log 
> cleaner runs?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6715) Leader transition for all partitions lead by two brokers without visible reason

2018-03-26 Thread Uwe Eisele (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Uwe Eisele updated KAFKA-6715:
--
Description: 
In our cluster we experienced a situation, in which the leader of all 
partitions lead by two brokers has been moved mainly to one other broker 
(broker 1).

We don't know why this happend. At this time there was not broker outage, nor a 
broker shutdown has been initiated. The Zookeeper nodes of the affected brokers 
(/brokers/ids/3, /brokers/ids/4) has not been modified during this time.

In addition there are no logs that would indicate a leader transition for the 
affected brokers. We would expect to see a "{{sending become-leader 
LeaderAndIsr request}}" in the controller log for each partition, as well a 
"{{completed LeaderAndIsr request}}" in the state change log of the Kafka 
brokers that becomes the new leader and follower. Our log level for the 
kafka.controller and the state change log is set to TRACE.

Though all Brokers are running, the situation does not recover. It sticks in a 
highly imbalanced leader distribution, in which two brokers are no leader for 
any partition, and one broker is the leader for almost all partitions.
{code:java}
kafka-controller Log (Level TRACE):
[2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for 
broker 5 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for 
broker 1 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for 
broker 2 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:03:54,043] TRACE [Controller 3]: Leader imbalance ratio for 
broker 3 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:03:54,043] TRACE [Controller 3]: Leader imbalance ratio for 
broker 4 is 0.0 (kafka.controller.KafkaController)
...
[2018-03-19 17:08:54,049] TRACE [Controller 3]: Leader imbalance ratio for 
broker 5 is 0.8054794520547945 (kafka.controller.KafkaController)
[2018-03-19 17:08:54,050] TRACE [Controller 3]: Leader imbalance ratio for 
broker 1 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:08:54,050] TRACE [Controller 3]: Leader imbalance ratio for 
broker 2 is 0.4807692307692308 (kafka.controller.KafkaController)
[2018-03-19 17:08:54,051] TRACE [Controller 3]: Leader imbalance ratio for 
broker 3 is 1.0 (kafka.controller.KafkaController)
[2018-03-19 17:08:54,053] TRACE [Controller 3]: Leader imbalance ratio for 
broker 4 is 1.0 (kafka.controller.KafkaController)
...
[2018-03-19 17:23:54,080] TRACE [Controller 3]: Leader imbalance ratio for 
broker 5 is 0.8054794520547945 (kafka.controller.KafkaController)
[2018-03-19 17:23:54,081] TRACE [Controller 3]: Leader imbalance ratio for 
broker 1 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:23:54,081] TRACE [Controller 3]: Leader imbalance ratio for 
broker 2 is 0.4807692307692308 (kafka.controller.KafkaController)
[2018-03-19 17:23:54,082] TRACE [Controller 3]: Leader imbalance ratio for 
broker 3 is 1.0 (kafka.controller.KafkaController)
[2018-03-19 17:23:54,084] TRACE [Controller 3]: Leader imbalance ratio for 
broker 4 is 1.0 (kafka.controller.KafkaController)
{code}
The imbalance was recognized by the controller, but nothing happend. The 
imbalance for broker 1 seems to be wrong, because this broker has taken the 
leader role for most of the partitions.

In addition it seems that the ReplicaFetcherThreads die without any log message 
(see attached stack trace), though we think this is not possible... However, we 
would expect log messages that state, that fetchers for partitions have been 
removed, as well that the ReplicaFetcherThreads are shutting down. The log 
level for _kafka_ is set to INFO. In other situations, when a broker is shut 
down we see such entries in the log files.

Besides that, this caused underreplicated partitions. It seems that no broker 
fetches from the partitions with the newly assigned leaders. Like the situation 
with the highly imbalanced leader distribution the cluster sticks in this state 
and does not recover.

This is a recurring problem, however we cannot reproduce it.

  was:
In our cluster we experienced a situation, in which the leader of all 
partitions lead by two brokers has been moved mainly to one other broker.

We don't know why this happend. At this time there was not broker outage, nor a 
broker shutdown has been initiated. The Zookeeper nodes of the affected brokers 
(/brokers/ids/3, /brokers/ids/4) has not been modified during this time.

In addition there are no logs that would indicate a leader transition for the 
affected brokers. We would expect to see a "{{sending become-leader 
LeaderAndIsr request}}" in the controller log for each partition, as well a 
"{{completed LeaderAndIsr request}}" in the state change log of the Kafka 
brokers that becomes the new leader and follower. Our log 

[jira] [Updated] (KAFKA-6715) Leader transition for all partitions lead by two brokers without visible reason

2018-03-26 Thread Uwe Eisele (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Uwe Eisele updated KAFKA-6715:
--
Description: 
In our cluster we experienced a situation, in which the leader of all 
partitions lead by two brokers has been moved mainly to one other broker.

We don't know why this happend. At this time there was not broker outage, nor a 
broker shutdown has been initiated. The Zookeeper nodes of the affected brokers 
(/brokers/ids/3, /brokers/ids/4) has not been modified during this time.

In addition there are no logs that would indicate a leader transition for the 
affected brokers. We would expect to see a "{{sending become-leader 
LeaderAndIsr request}}" in the controller log for each partition, as well a 
"{{completed LeaderAndIsr request}}" in the state change log of the Kafka 
brokers that becomes the new leader and follower. Our log level for the 
kafka.controller and the state change log is set to TRACE.

Though all Brokers are running, the situation does not recover. It sticks in a 
highly imbalanced leader distribution, in which two brokers are no leader for 
any partition, and one broker is the leader for almost all partitions.
{code:java}
kafka-controller Log (Level TRACE):
[2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for 
broker 5 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for 
broker 1 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for 
broker 2 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:03:54,043] TRACE [Controller 3]: Leader imbalance ratio for 
broker 3 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:03:54,043] TRACE [Controller 3]: Leader imbalance ratio for 
broker 4 is 0.0 (kafka.controller.KafkaController)
...
[2018-03-19 17:08:54,049] TRACE [Controller 3]: Leader imbalance ratio for 
broker 5 is 0.8054794520547945 (kafka.controller.KafkaController)
[2018-03-19 17:08:54,050] TRACE [Controller 3]: Leader imbalance ratio for 
broker 1 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:08:54,050] TRACE [Controller 3]: Leader imbalance ratio for 
broker 2 is 0.4807692307692308 (kafka.controller.KafkaController)
[2018-03-19 17:08:54,051] TRACE [Controller 3]: Leader imbalance ratio for 
broker 3 is 1.0 (kafka.controller.KafkaController)
[2018-03-19 17:08:54,053] TRACE [Controller 3]: Leader imbalance ratio for 
broker 4 is 1.0 (kafka.controller.KafkaController)
...
[2018-03-19 17:23:54,080] TRACE [Controller 3]: Leader imbalance ratio for 
broker 5 is 0.8054794520547945 (kafka.controller.KafkaController)
[2018-03-19 17:23:54,081] TRACE [Controller 3]: Leader imbalance ratio for 
broker 1 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:23:54,081] TRACE [Controller 3]: Leader imbalance ratio for 
broker 2 is 0.4807692307692308 (kafka.controller.KafkaController)
[2018-03-19 17:23:54,082] TRACE [Controller 3]: Leader imbalance ratio for 
broker 3 is 1.0 (kafka.controller.KafkaController)
[2018-03-19 17:23:54,084] TRACE [Controller 3]: Leader imbalance ratio for 
broker 4 is 1.0 (kafka.controller.KafkaController)
{code}
The imbalance was recognized by the controller, but nothing happend.

In addition it seems that the ReplicaFetcherThreads die without any log message 
(see attached stack trace), though we think this is not possible... However, we 
would expect log messages that state, that fetchers for partitions have been 
removed, as well that the ReplicaFetcherThreads are shutting down. The log 
level for _kafka_ is set to INFO. In other situations, when a broker is shut 
down we see such entries in the log files.

Besides that, this caused underreplicated partitions. It seems that no broker 
fetches from the partitions with the newly assigned leaders. Like the situation 
with the highly imbalanced leader distribution the cluster sticks in this state 
and does not recover.

This is a recurring problem, however we cannot reproduce it.

  was:
In our cluster we experienced a situation, in which the leader of all 
partitions lead by two brokers has been moved mainly to one other broker.

We don't know why this happend. At this time there was not broker outage, nor a 
broker shutdown has been initiated. The Zookeeper nodes of the affected brokers 
(/brokers/ids/3, /brokers/ids/4) has not been modified during this time.

In addition there are no logs that would indicate a leader transition for the 
affected brokers. We would expect to see a "{{sending become-leader 
LeaderAndIsr request}}" in the controller log for each partition, as well a 
"{{completed LeaderAndIsr request}}" in the state change log of the Kafka 
brokers that becomes the new leader and follower. Our log level for the 
kafka.controller and the state change log is set to TRACE.

Though all Brokers are running, the situation does not 

[jira] [Updated] (KAFKA-6714) KafkaController marks all Brokers as "Shutting down", though only one broker has been shut down

2018-03-26 Thread Uwe Eisele (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Uwe Eisele updated KAFKA-6714:
--
Description: 
In our Kafka cluster we experienced a situation in wich the Kafka controller 
has all Brokers marked as "Shutting down", though indeed only one Broker has 
been shut down.

The last log entry about the broker state before the entry that states that all 
brokers are shutting down states that no brokers are shutting down.

The consequence of this weird state is, that the Kafka controller is not able 
to elect any partition leader.
{code:java}
kafka.controller Log (Level TRACE):
[2018-03-15 16:28:24,288] INFO [Controller 5]: Shutting down broker 5 
(kafka.controller.KafkaController)
[2018-03-15 16:28:24,288] DEBUG [Controller 5]: All shutting down brokers: 5 
(kafka.controller.KafkaController)
[2018-03-15 16:28:24,288] DEBUG [Controller 5]: Live brokers: 1,2,3,4 
(kafka.controller.KafkaController)
...
[2018-03-15 16:28:36,846] INFO [Controller 3]: Currently active brokers in the 
cluster: Set(1, 2, 3, 4) (kafka.controller.KafkaController)
[2018-03-15 16:28:36,846] INFO [Controller 3]: Currently shutting brokers in 
the cluster: Set() (kafka.controller.KafkaController)
...
[2018-03-19 17:57:22,273] INFO [Controller 3]: Shutting down broker 1 
(kafka.controller.KafkaController)
[2018-03-19 17:57:22,273] DEBUG [Controller 3]: All shutting down brokers: 
1,5,2,3,4 (kafka.controller.KafkaController)
[2018-03-19 17:57:22,273] DEBUG [Controller 3]: Live brokers:  
(kafka.controller.KafkaController)
{code}
{code:java}
state.change.logger Log (Level TRACE):
[2018-03-19 17:57:22,275] ERROR Controller 3 epoch 83 encountered error while 
electing leader for partition 
[zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] due to: No other 
replicas in ISR 1,3,5 for 
[zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] besides shutting 
down brokers 1,5,2,3,4. (state.change.logger) {code}
The question is why the Kafka controller assumes that all brokers are shutting 
down?

The only place in the Kafka code (0.11.0.2) we found in which the shutting down 
broker set is changed is in the class _kafka.controller.KafkaControler_ in line 
1407 in the method _doControlledShutdown_.
{code:java}
info("Shutting down broker " + id)

if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
  throw new BrokerNotAvailableException("Broker id %d does not 
exist.".format(id))

controllerContext.shuttingDownBrokerIds.add(id)
{code}
However, we should see the log entry "Shutting down broker n" for all Brokers 
in the log file, but it is not there.

This is a recurring problem, however we cannot reproduce it.

  was:
In our Kafka cluster we experienced a situation in wich the Kafka controller 
has all Brokers marked as "Shutting down", though indeed only one Broker has 
been shut down.

The last log entry about the broker state before the entry that states that all 
brokers are shutting down states that no brokers are shutting down.

The consequence of this weird state is, that the Kafka controller is not able 
to elect any partition leader.
{code:java}
kafka.controller Log (Level TRACE):
[2018-03-15 16:28:24,288] INFO [Controller 5]: Shutting down broker 5 
(kafka.controller.KafkaController)
[2018-03-15 16:28:24,288] DEBUG [Controller 5]: All shutting down brokers: 5 
(kafka.controller.KafkaController)
[2018-03-15 16:28:24,288] DEBUG [Controller 5]: Live brokers: 1,2,3,4 
(kafka.controller.KafkaController)
...
[2018-03-15 16:28:36,846] INFO [Controller 3]: Currently active brokers in the 
cluster: Set(1, 2, 3, 4) (kafka.controller.KafkaController)
[2018-03-15 16:28:36,846] INFO [Controller 3]: Currently shutting brokers in 
the cluster: Set() (kafka.controller.KafkaController)
...
[2018-03-19 17:57:22,273] INFO [Controller 3]: Shutting down broker 1 
(kafka.controller.KafkaController)
[2018-03-19 17:57:22,273] DEBUG [Controller 3]: All shutting down brokers: 
1,5,2,3,4 (kafka.controller.KafkaController)
[2018-03-19 17:57:22,273] DEBUG [Controller 3]: Live brokers:  
(kafka.controller.KafkaController)
{code}
{code:java}
state.change.logger Log (Level TRACE):
[2018-03-19 17:57:22,275] ERROR Controller 3 epoch 83 encountered error while 
electing leader for partition 
[zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] due to: No other 
replicas in ISR 1,3,5 for 
[zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] besides shutting 
down brokers 1,5,2,3,4. (state.change.logger) {code}
The question is why the Kafka controller assumes that all brokers are shutting 
down?

The only place in the Kafka code (0.11.0.2) we found in which the shutting down 
broker set is changed is in the class _kafka.controller.KafkaControler_ in line 
1407 in the method _doControlledShutdown_.
{code:java}
info("Shutting down broker " + id)

if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
  throw new 

[jira] [Updated] (KAFKA-6715) Leader transition for all partitions lead by two brokers without visible reason

2018-03-26 Thread Uwe Eisele (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Uwe Eisele updated KAFKA-6715:
--
Description: 
In our cluster we experienced a situation, in which the leader of all 
partitions lead by two brokers has been moved mainly to one other broker.

We don't know why this happend. At this time there was not broker outage, nor a 
broker shutdown has been initiated. The Zookeeper nodes of the affected brokers 
(/brokers/ids/3, /brokers/ids/4) has not been modified during this time.

In addition there are no logs that would indicate a leader transition for the 
affected brokers. We would expect to see a "{{sending become-leader 
LeaderAndIsr request}}" in the controller log for each partition, as well a 
"{{completed LeaderAndIsr request}}" in the state change log of the Kafka 
brokers that becomes the new leader and follower. Our log level for the 
kafka.controller and the state change log is set to TRACE.

Though all Brokers are running, the situation does not recover. It sticks in a 
highly imbalanced leader distribution, in which two brokers are no leader for 
any partition, and one broker is the leader for almost all partitions.
{code:java}
kafka-controller Log (Level TRACE):
[2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for 
broker 5 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for 
broker 1 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for 
broker 2 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:03:54,043] TRACE [Controller 3]: Leader imbalance ratio for 
broker 3 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:03:54,043] TRACE [Controller 3]: Leader imbalance ratio for 
broker 4 is 0.0 (kafka.controller.KafkaController)
...
[2018-03-19 17:08:54,049] TRACE [Controller 3]: Leader imbalance ratio for 
broker 5 is 0.8054794520547945 (kafka.controller.KafkaController)
[2018-03-19 17:08:54,050] TRACE [Controller 3]: Leader imbalance ratio for 
broker 1 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:08:54,050] TRACE [Controller 3]: Leader imbalance ratio for 
broker 2 is 0.4807692307692308 (kafka.controller.KafkaController)
[2018-03-19 17:08:54,051] TRACE [Controller 3]: Leader imbalance ratio for 
broker 3 is 1.0 (kafka.controller.KafkaController)
[2018-03-19 17:08:54,053] TRACE [Controller 3]: Leader imbalance ratio for 
broker 4 is 1.0 (kafka.controller.KafkaController)
...
[2018-03-19 17:23:54,080] TRACE [Controller 3]: Leader imbalance ratio for 
broker 5 is 0.8054794520547945 (kafka.controller.KafkaController)
[2018-03-19 17:23:54,081] TRACE [Controller 3]: Leader imbalance ratio for 
broker 1 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:23:54,081] TRACE [Controller 3]: Leader imbalance ratio for 
broker 2 is 0.4807692307692308 (kafka.controller.KafkaController)
[2018-03-19 17:23:54,082] TRACE [Controller 3]: Leader imbalance ratio for 
broker 3 is 1.0 (kafka.controller.KafkaController)
[2018-03-19 17:23:54,084] TRACE [Controller 3]: Leader imbalance ratio for 
broker 4 is 1.0 (kafka.controller.KafkaController)
{code}
The imbalance was recognized by the controller, but nothing happend.

In addition it seems that the ReplicaFetcherThreads die without any log message 
(see attached stack trace), though we think this is not possible... However, we 
would expect log messages that state, that fetchers for partitions have been 
removed, as well that the ReplicaFetcherThreads are shutting down. The log 
level for _kafka_ is set to INFO. In other situations, when a broker is shut 
down we see such entries in the log files.

Besides that, this caused underreplicated partitions. It seems that no broker 
fetches from the partitions with the newly assigned leaders. Like the situation 
with the highly imbalanced leader distribution the cluster sticks in this state 
and does not recover.

  was:
In our cluster we experienced a situation, in which the leader of all 
partitions lead by two brokers has been moved mainly to one other broker.

We don't know why this happend. At this time there was not broker outage, nor a 
broker shutdown has been initiated. The Zookeeper nodes of the affected brokers 
(/brokers/ids/3, /brokers/ids/4) has not been modified during this time.

In addition there are no logs that would indicate a leader transition for the 
affected brokers. We would expect to see a "{{sending become-leader 
LeaderAndIsr request}}" in the controller log for each partition, as well a 
"{{completed LeaderAndIsr request}}" in the state change log of the Kafka 
brokers that becomes the new leader and follower. Our log level for the 
kafka.controller and the state change log is set to TRACE.

Though all Brokers are running, the situation does not recover. It sticks in a 
highly imbalanced leader distribution, in 

[jira] [Updated] (KAFKA-6715) Leader transition for all partitions lead by two brokers without visible reason

2018-03-26 Thread Uwe Eisele (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Uwe Eisele updated KAFKA-6715:
--
Attachment: 20180319-1756_kafka01-jvm-stack.dump

> Leader transition for all partitions lead by two brokers without visible 
> reason
> ---
>
> Key: KAFKA-6715
> URL: https://issues.apache.org/jira/browse/KAFKA-6715
> Project: Kafka
>  Issue Type: Bug
>  Components: core, replication
>Affects Versions: 0.11.0.2
> Environment: Kafka cluster on Amazon AWS EC2 r4.2xlarge instances 
> with 5 nodes and a Zookeeper cluster on r4.2xlarge instances with 3 nodes. 
> The cluster is distributed across 2 availability zones.
>Reporter: Uwe Eisele
>Priority: Critical
> Attachments: 20180319-1756_kafka01-jvm-stack.dump
>
>
> In our cluster we experienced a situation, in which the leader of all 
> partitions lead by two brokers has been moved mainly to one other broker.
> We don't know why this happend. At this time there was not broker outage, nor 
> a broker shutdown has been initiated. The Zookeeper nodes of the affected 
> brokers (/brokers/ids/3, /brokers/ids/4) has not been modified during this 
> time.
> In addition there are no logs that would indicate a leader transition for the 
> affected brokers. We would expect to see a "{{sending become-leader 
> LeaderAndIsr request}}" in the controller log for each partition, as well a 
> "{{completed LeaderAndIsr request}}" in the state change log of the Kafka 
> brokers that becomes the new leader and follower. Our log level for the 
> kafka.controller and the state change log is set to TRACE.
> Though all Brokers are running, the situation does not recover. It sticks in 
> a highly imbalanced leader distribution, in which two brokers are no leader 
> for any partition, and one broker is the leader for almost all partitions.
> {code:java}
> kafka-controller Log (Level TRACE):
> [2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for 
> broker 5 is 0.0 (kafka.controller.KafkaController)
> [2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for 
> broker 1 is 0.0 (kafka.controller.KafkaController)
> [2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for 
> broker 2 is 0.0 (kafka.controller.KafkaController)
> [2018-03-19 17:03:54,043] TRACE [Controller 3]: Leader imbalance ratio for 
> broker 3 is 0.0 (kafka.controller.KafkaController)
> [2018-03-19 17:03:54,043] TRACE [Controller 3]: Leader imbalance ratio for 
> broker 4 is 0.0 (kafka.controller.KafkaController)
> ...
> [2018-03-19 17:08:54,049] TRACE [Controller 3]: Leader imbalance ratio for 
> broker 5 is 0.8054794520547945 (kafka.controller.KafkaController)
> [2018-03-19 17:08:54,050] TRACE [Controller 3]: Leader imbalance ratio for 
> broker 1 is 0.0 (kafka.controller.KafkaController)
> [2018-03-19 17:08:54,050] TRACE [Controller 3]: Leader imbalance ratio for 
> broker 2 is 0.4807692307692308 (kafka.controller.KafkaController)
> [2018-03-19 17:08:54,051] TRACE [Controller 3]: Leader imbalance ratio for 
> broker 3 is 1.0 (kafka.controller.KafkaController)
> [2018-03-19 17:08:54,053] TRACE [Controller 3]: Leader imbalance ratio for 
> broker 4 is 1.0 (kafka.controller.KafkaController)
> ...
> [2018-03-19 17:23:54,080] TRACE [Controller 3]: Leader imbalance ratio for 
> broker 5 is 0.8054794520547945 (kafka.controller.KafkaController)
> [2018-03-19 17:23:54,081] TRACE [Controller 3]: Leader imbalance ratio for 
> broker 1 is 0.0 (kafka.controller.KafkaController)
> [2018-03-19 17:23:54,081] TRACE [Controller 3]: Leader imbalance ratio for 
> broker 2 is 0.4807692307692308 (kafka.controller.KafkaController)
> [2018-03-19 17:23:54,082] TRACE [Controller 3]: Leader imbalance ratio for 
> broker 3 is 1.0 (kafka.controller.KafkaController)
> [2018-03-19 17:23:54,084] TRACE [Controller 3]: Leader imbalance ratio for 
> broker 4 is 1.0 (kafka.controller.KafkaController)
> {code}
> The imbalance was recognized by the controller, but nothing happend.
> In addition it seems that the ReplicaFetcherThreads die without any log 
> message, though we think this is not possible... However, we would expect log 
> messages that state, that fetchers for partitions has been removed, as well 
> that the ReplicaFetcherThreads are shutting down. The log level for _kafka_ 
> is set to INFO. In other situations, when a broker is shuttdown we see such 
> entries in the log files.
> Besides that, this caused underreplicated partitions. It seems that no broker 
> fetches from the partitions with the newly assigned leaders. Like the 
> situation with the highly imbalanced leader distribution the cluster sticks 
> in this state and does not recover.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6715) Leader transition for all partitions lead by two brokers without visible reason

2018-03-26 Thread Uwe Eisele (JIRA)
Uwe Eisele created KAFKA-6715:
-

 Summary: Leader transition for all partitions lead by two brokers 
without visible reason
 Key: KAFKA-6715
 URL: https://issues.apache.org/jira/browse/KAFKA-6715
 Project: Kafka
  Issue Type: Bug
  Components: core, replication
Affects Versions: 0.11.0.2
 Environment: Kafka cluster on Amazon AWS EC2 r4.2xlarge instances with 
5 nodes and a Zookeeper cluster on r4.2xlarge instances with 3 nodes. The 
cluster is distributed across 2 availability zones.
Reporter: Uwe Eisele


In our cluster we experienced a situation, in which the leader of all 
partitions lead by two brokers has been moved mainly to one other broker.

We don't know why this happend. At this time there was not broker outage, nor a 
broker shutdown has been initiated. The Zookeeper nodes of the affected brokers 
(/brokers/ids/3, /brokers/ids/4) has not been modified during this time.

In addition there are no logs that would indicate a leader transition for the 
affected brokers. We would expect to see a "{{sending become-leader 
LeaderAndIsr request}}" in the controller log for each partition, as well a 
"{{completed LeaderAndIsr request}}" in the state change log of the Kafka 
brokers that becomes the new leader and follower. Our log level for the 
kafka.controller and the state change log is set to TRACE.

Though all Brokers are running, the situation does not recover. It sticks in a 
highly imbalanced leader distribution, in which two brokers are no leader for 
any partition, and one broker is the leader for almost all partitions.
{code:java}
kafka-controller Log (Level TRACE):
[2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for 
broker 5 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for 
broker 1 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for 
broker 2 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:03:54,043] TRACE [Controller 3]: Leader imbalance ratio for 
broker 3 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:03:54,043] TRACE [Controller 3]: Leader imbalance ratio for 
broker 4 is 0.0 (kafka.controller.KafkaController)
...
[2018-03-19 17:08:54,049] TRACE [Controller 3]: Leader imbalance ratio for 
broker 5 is 0.8054794520547945 (kafka.controller.KafkaController)
[2018-03-19 17:08:54,050] TRACE [Controller 3]: Leader imbalance ratio for 
broker 1 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:08:54,050] TRACE [Controller 3]: Leader imbalance ratio for 
broker 2 is 0.4807692307692308 (kafka.controller.KafkaController)
[2018-03-19 17:08:54,051] TRACE [Controller 3]: Leader imbalance ratio for 
broker 3 is 1.0 (kafka.controller.KafkaController)
[2018-03-19 17:08:54,053] TRACE [Controller 3]: Leader imbalance ratio for 
broker 4 is 1.0 (kafka.controller.KafkaController)
...
[2018-03-19 17:23:54,080] TRACE [Controller 3]: Leader imbalance ratio for 
broker 5 is 0.8054794520547945 (kafka.controller.KafkaController)
[2018-03-19 17:23:54,081] TRACE [Controller 3]: Leader imbalance ratio for 
broker 1 is 0.0 (kafka.controller.KafkaController)
[2018-03-19 17:23:54,081] TRACE [Controller 3]: Leader imbalance ratio for 
broker 2 is 0.4807692307692308 (kafka.controller.KafkaController)
[2018-03-19 17:23:54,082] TRACE [Controller 3]: Leader imbalance ratio for 
broker 3 is 1.0 (kafka.controller.KafkaController)
[2018-03-19 17:23:54,084] TRACE [Controller 3]: Leader imbalance ratio for 
broker 4 is 1.0 (kafka.controller.KafkaController)
{code}
The imbalance was recognized by the controller, but nothing happend.

In addition it seems that the ReplicaFetcherThreads die without any log 
message, though we think this is not possible... However, we would expect log 
messages that state, that fetchers for partitions has been removed, as well 
that the ReplicaFetcherThreads are shutting down. The log level for _kafka_ is 
set to INFO. In other situations, when a broker is shuttdown we see such 
entries in the log files.

Besides that, this caused underreplicated partitions. It seems that no broker 
fetches from the partitions with the newly assigned leaders. Like the situation 
with the highly imbalanced leader distribution the cluster sticks in this state 
and does not recover.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6681) Two instances of kafka consumer reading the same partition within a consumer group

2018-03-26 Thread Narayan Periwal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16413789#comment-16413789
 ] 

Narayan Periwal commented on KAFKA-6681:


[~yuzhih...@gmail.com], 
We faced yet another such issue, on server side we found these logs in this case

{noformat}
[2018-03-23 18:59:16,560] INFO [GroupCoordinator 6]: Stabilized group 
prod-m10n-event-batcher-billablebeaconams1 generation 6 
(kafka.coordinator.GroupCoordinator)
[2018-03-23 18:59:46,561] INFO [GroupCoordinator 6]: Preparing to restabilize 
group prod-m10n-event-batcher-billablebeaconams1 with old generation 6 
(kafka.coordinator.GroupCoordinator)
[2018-03-23 18:59:46,833] INFO [GroupCoordinator 6]: Stabilized group 
prod-m10n-event-batcher-billablebeaconams1 generation 7 
(kafka.coordinator.GroupCoordinator)
{noformat}

> Two instances of kafka consumer reading the same partition within a consumer 
> group
> --
>
> Key: KAFKA-6681
> URL: https://issues.apache.org/jira/browse/KAFKA-6681
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.1
>Reporter: Narayan Periwal
>Priority: Critical
> Attachments: server-1.log, server-2.log
>
>
> We have seen this issue with the Kafka consumer, the new library that got 
> introduced in 0.9
> With this new client, the group management is done by kafka coordinator, 
> which is one of the kafka broker.
> We are using Kafka broker 0.10.2.1 and consumer client version is also 
> 0.10.2.1 
> The issue that we have faced is that, after rebalancing, some of the 
> partitions gets consumed by 2 instances within a consumer group, leading to 
> duplication of the entire partition data. Both the instances continue to read 
> until the next rebalancing, or the restart of those clients. 
> It looks like that a particular consumer goes on fetching the data from a 
> partition, but the broker is not able to identify this "stale" consumer 
> instance. 
> During this time, we also see the underreplicated partition metrics spiking. 
> We have hit this twice in production. Please look at it the earliest. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6714) KafkaController marks all Brokers as "Shutting down", though only one broker has been shut down

2018-03-26 Thread Uwe Eisele (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Uwe Eisele updated KAFKA-6714:
--
Environment: Kafka cluster on Amazon AWS EC2 r4.2xlarge instances with 5 
nodes and a Zookeeper cluster on r4.2xlarge instances with 3 nodes. The cluster 
is distributed across 2 availability zones.  (was: Kafka Cluster on Amazon AWS 
EC2 r4.2xlarge instances with 5 nodes and a Zookeeper Cluster on r4.2xlarge 
instances with 3 nodes. The Cluster is distributed across 2 availability zones.)

> KafkaController marks all Brokers as "Shutting down", though only one broker 
> has been shut down
> ---
>
> Key: KAFKA-6714
> URL: https://issues.apache.org/jira/browse/KAFKA-6714
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core
>Affects Versions: 0.11.0.2
> Environment: Kafka cluster on Amazon AWS EC2 r4.2xlarge instances 
> with 5 nodes and a Zookeeper cluster on r4.2xlarge instances with 3 nodes. 
> The cluster is distributed across 2 availability zones.
>Reporter: Uwe Eisele
>Priority: Critical
>
> In our Kafka Cluster we experienced a situation in wich the Kafka controller 
> has all Brokers marked as "Shutting down", though indeed only one Broker has 
> been shut down.
> The last log entry about the broker state before the entry that states that 
> all brokers are shutting down states that no brokers are shutting down.
> The consequence of this weird state is, that the Kafka controller is not able 
> to elect any partition leader.
> {code:java}
> [2018-03-15 16:28:24,288] INFO [Controller 5]: Shutting down broker 5 
> (kafka.controller.KafkaController)
> [2018-03-15 16:28:24,288] DEBUG [Controller 5]: All shutting down brokers: 5 
> (kafka.controller.KafkaController)
> [2018-03-15 16:28:24,288] DEBUG [Controller 5]: Live brokers: 1,2,3,4 
> (kafka.controller.KafkaController)
> ...
> [2018-03-15 16:28:36,846] INFO [Controller 3]: Currently active brokers in 
> the cluster: Set(1, 2, 3, 4) (kafka.controller.KafkaController)
> [2018-03-15 16:28:36,846] INFO [Controller 3]: Currently shutting brokers in 
> the cluster: Set() (kafka.controller.KafkaController)
> ...
> [2018-03-19 17:57:22,273] INFO [Controller 3]: Shutting down broker 1 
> (kafka.controller.KafkaController)
> [2018-03-19 17:57:22,273] DEBUG [Controller 3]: All shutting down brokers: 
> 1,5,2,3,4 (kafka.controller.KafkaController)
> [2018-03-19 17:57:22,273] DEBUG [Controller 3]: Live brokers:  
> (kafka.controller.KafkaController)
> ...
> [2018-03-19 17:57:22,275] ERROR Controller 3 epoch 83 encountered error while 
> electing leader for partition 
> [zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] due to: No other 
> replicas in ISR 1,3,5 for 
> [zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] besides shutting 
> down brokers 1,5,2,3,4. (state.change.logger) {code}
> The question is why the Kafka controller assumes that all brokers are 
> shutting down?
> The only place in the Kafka code (0.11.0.2) we found in which the shutting 
> down broker set is changed is in the class _kafka.controller.KafkaControler_ 
> in line 1407 in the method _doControlledShutdown_.
>  
> {code:java}
> info("Shutting down broker " + id)
> if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
>   throw new BrokerNotAvailableException("Broker id %d does not 
> exist.".format(id))
> controllerContext.shuttingDownBrokerIds.add(id)
> {code}
> However, we should see the log entry "Shutting down broker n" for all Brokers 
> in the log file, but it is not there.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6714) KafkaController marks all Brokers as "Shutting down", though only one broker has been shut down

2018-03-26 Thread Uwe Eisele (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Uwe Eisele updated KAFKA-6714:
--
Description: 
In our Kafka cluster we experienced a situation in wich the Kafka controller 
has all Brokers marked as "Shutting down", though indeed only one Broker has 
been shut down.

The last log entry about the broker state before the entry that states that all 
brokers are shutting down states that no brokers are shutting down.

The consequence of this weird state is, that the Kafka controller is not able 
to elect any partition leader.
{code:java}
[2018-03-15 16:28:24,288] INFO [Controller 5]: Shutting down broker 5 
(kafka.controller.KafkaController)
[2018-03-15 16:28:24,288] DEBUG [Controller 5]: All shutting down brokers: 5 
(kafka.controller.KafkaController)
[2018-03-15 16:28:24,288] DEBUG [Controller 5]: Live brokers: 1,2,3,4 
(kafka.controller.KafkaController)
...
[2018-03-15 16:28:36,846] INFO [Controller 3]: Currently active brokers in the 
cluster: Set(1, 2, 3, 4) (kafka.controller.KafkaController)
[2018-03-15 16:28:36,846] INFO [Controller 3]: Currently shutting brokers in 
the cluster: Set() (kafka.controller.KafkaController)
...
[2018-03-19 17:57:22,273] INFO [Controller 3]: Shutting down broker 1 
(kafka.controller.KafkaController)
[2018-03-19 17:57:22,273] DEBUG [Controller 3]: All shutting down brokers: 
1,5,2,3,4 (kafka.controller.KafkaController)
[2018-03-19 17:57:22,273] DEBUG [Controller 3]: Live brokers:  
(kafka.controller.KafkaController)
...
[2018-03-19 17:57:22,275] ERROR Controller 3 epoch 83 encountered error while 
electing leader for partition 
[zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] due to: No other 
replicas in ISR 1,3,5 for 
[zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] besides shutting 
down brokers 1,5,2,3,4. (state.change.logger) {code}
The question is why the Kafka controller assumes that all brokers are shutting 
down?

The only place in the Kafka code (0.11.0.2) we found in which the shutting down 
broker set is changed is in the class _kafka.controller.KafkaControler_ in line 
1407 in the method _doControlledShutdown_.
{code:java}
info("Shutting down broker " + id)

if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
  throw new BrokerNotAvailableException("Broker id %d does not 
exist.".format(id))

controllerContext.shuttingDownBrokerIds.add(id)
{code}
However, we should see the log entry "Shutting down broker n" for all Brokers 
in the log file, but it is not there.

  was:
In our Kafka Cluster we experienced a situation in wich the Kafka controller 
has all Brokers marked as "Shutting down", though indeed only one Broker has 
been shut down.

The last log entry about the broker state before the entry that states that all 
brokers are shutting down states that no brokers are shutting down.

The consequence of this weird state is, that the Kafka controller is not able 
to elect any partition leader.
{code:java}
[2018-03-15 16:28:24,288] INFO [Controller 5]: Shutting down broker 5 
(kafka.controller.KafkaController)
[2018-03-15 16:28:24,288] DEBUG [Controller 5]: All shutting down brokers: 5 
(kafka.controller.KafkaController)
[2018-03-15 16:28:24,288] DEBUG [Controller 5]: Live brokers: 1,2,3,4 
(kafka.controller.KafkaController)
...
[2018-03-15 16:28:36,846] INFO [Controller 3]: Currently active brokers in the 
cluster: Set(1, 2, 3, 4) (kafka.controller.KafkaController)
[2018-03-15 16:28:36,846] INFO [Controller 3]: Currently shutting brokers in 
the cluster: Set() (kafka.controller.KafkaController)
...
[2018-03-19 17:57:22,273] INFO [Controller 3]: Shutting down broker 1 
(kafka.controller.KafkaController)
[2018-03-19 17:57:22,273] DEBUG [Controller 3]: All shutting down brokers: 
1,5,2,3,4 (kafka.controller.KafkaController)
[2018-03-19 17:57:22,273] DEBUG [Controller 3]: Live brokers:  
(kafka.controller.KafkaController)
...
[2018-03-19 17:57:22,275] ERROR Controller 3 epoch 83 encountered error while 
electing leader for partition 
[zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] due to: No other 
replicas in ISR 1,3,5 for 
[zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] besides shutting 
down brokers 1,5,2,3,4. (state.change.logger) {code}
The question is why the Kafka controller assumes that all brokers are shutting 
down?

The only place in the Kafka code (0.11.0.2) we found in which the shutting down 
broker set is changed is in the class _kafka.controller.KafkaControler_ in line 
1407 in the method _doControlledShutdown_.

 
{code:java}
info("Shutting down broker " + id)

if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
  throw new BrokerNotAvailableException("Broker id %d does not 
exist.".format(id))

controllerContext.shuttingDownBrokerIds.add(id)
{code}
However, we should see the log entry "Shutting down broker n" for all Brokers 
in the log file, but it is not there.

 

 



[jira] [Created] (KAFKA-6714) KafkaController marks all Brokers as "Shutting down", though only one broker has been shut down

2018-03-26 Thread Uwe Eisele (JIRA)
Uwe Eisele created KAFKA-6714:
-

 Summary: KafkaController marks all Brokers as "Shutting down", 
though only one broker has been shut down
 Key: KAFKA-6714
 URL: https://issues.apache.org/jira/browse/KAFKA-6714
 Project: Kafka
  Issue Type: Bug
  Components: controller, core
Affects Versions: 0.11.0.2
 Environment: Kafka Cluster on Amazon AWS EC2 r4.2xlarge instances with 
5 nodes and a Zookeeper Cluster on r4.2xlarge instances with 3 nodes. The 
Cluster is distributed across 2 availability zones.
Reporter: Uwe Eisele


In our Kafka Cluster we experienced a situation in wich the Kafka controller 
has all Brokers marked as "Shutting down", though indeed only one Broker has 
been shut down.

The last log entry about the broker state before the entry that states that all 
brokers are shutting down states that no brokers are shutting down.

The consequence of this weird state is, that the Kafka controller is not able 
to elect any partition leader.
{code:java}
[2018-03-15 16:28:24,288] INFO [Controller 5]: Shutting down broker 5 
(kafka.controller.KafkaController)
[2018-03-15 16:28:24,288] DEBUG [Controller 5]: All shutting down brokers: 5 
(kafka.controller.KafkaController)
[2018-03-15 16:28:24,288] DEBUG [Controller 5]: Live brokers: 1,2,3,4 
(kafka.controller.KafkaController)
...
[2018-03-15 16:28:36,846] INFO [Controller 3]: Currently active brokers in the 
cluster: Set(1, 2, 3, 4) (kafka.controller.KafkaController)
[2018-03-15 16:28:36,846] INFO [Controller 3]: Currently shutting brokers in 
the cluster: Set() (kafka.controller.KafkaController)
...
[2018-03-19 17:57:22,273] INFO [Controller 3]: Shutting down broker 1 
(kafka.controller.KafkaController)
[2018-03-19 17:57:22,273] DEBUG [Controller 3]: All shutting down brokers: 
1,5,2,3,4 (kafka.controller.KafkaController)
[2018-03-19 17:57:22,273] DEBUG [Controller 3]: Live brokers:  
(kafka.controller.KafkaController)
...
[2018-03-19 17:57:22,275] ERROR Controller 3 epoch 83 encountered error while 
electing leader for partition 
[zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] due to: No other 
replicas in ISR 1,3,5 for 
[zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] besides shutting 
down brokers 1,5,2,3,4. (state.change.logger) {code}
The question is why the Kafka controller assumes that all brokers are shutting 
down?

The only place in the Kafka code (0.11.0.2) we found in which the shutting down 
broker set is changed is in the class _kafka.controller.KafkaControler_ in line 
1407 in the method _doControlledShutdown_.

 
{code:java}
info("Shutting down broker " + id)

if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
  throw new BrokerNotAvailableException("Broker id %d does not 
exist.".format(id))

controllerContext.shuttingDownBrokerIds.add(id)
{code}
However, we should see the log entry "Shutting down broker n" for all Brokers 
in the log file, but it is not there.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6714) KafkaController marks all Brokers as "Shutting down", though only one broker has been shut down

2018-03-26 Thread Uwe Eisele (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Uwe Eisele updated KAFKA-6714:
--
Priority: Critical  (was: Major)

> KafkaController marks all Brokers as "Shutting down", though only one broker 
> has been shut down
> ---
>
> Key: KAFKA-6714
> URL: https://issues.apache.org/jira/browse/KAFKA-6714
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core
>Affects Versions: 0.11.0.2
> Environment: Kafka Cluster on Amazon AWS EC2 r4.2xlarge instances 
> with 5 nodes and a Zookeeper Cluster on r4.2xlarge instances with 3 nodes. 
> The Cluster is distributed across 2 availability zones.
>Reporter: Uwe Eisele
>Priority: Critical
>
> In our Kafka Cluster we experienced a situation in wich the Kafka controller 
> has all Brokers marked as "Shutting down", though indeed only one Broker has 
> been shut down.
> The last log entry about the broker state before the entry that states that 
> all brokers are shutting down states that no brokers are shutting down.
> The consequence of this weird state is, that the Kafka controller is not able 
> to elect any partition leader.
> {code:java}
> [2018-03-15 16:28:24,288] INFO [Controller 5]: Shutting down broker 5 
> (kafka.controller.KafkaController)
> [2018-03-15 16:28:24,288] DEBUG [Controller 5]: All shutting down brokers: 5 
> (kafka.controller.KafkaController)
> [2018-03-15 16:28:24,288] DEBUG [Controller 5]: Live brokers: 1,2,3,4 
> (kafka.controller.KafkaController)
> ...
> [2018-03-15 16:28:36,846] INFO [Controller 3]: Currently active brokers in 
> the cluster: Set(1, 2, 3, 4) (kafka.controller.KafkaController)
> [2018-03-15 16:28:36,846] INFO [Controller 3]: Currently shutting brokers in 
> the cluster: Set() (kafka.controller.KafkaController)
> ...
> [2018-03-19 17:57:22,273] INFO [Controller 3]: Shutting down broker 1 
> (kafka.controller.KafkaController)
> [2018-03-19 17:57:22,273] DEBUG [Controller 3]: All shutting down brokers: 
> 1,5,2,3,4 (kafka.controller.KafkaController)
> [2018-03-19 17:57:22,273] DEBUG [Controller 3]: Live brokers:  
> (kafka.controller.KafkaController)
> ...
> [2018-03-19 17:57:22,275] ERROR Controller 3 epoch 83 encountered error while 
> electing leader for partition 
> [zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] due to: No other 
> replicas in ISR 1,3,5 for 
> [zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] besides shutting 
> down brokers 1,5,2,3,4. (state.change.logger) {code}
> The question is why the Kafka controller assumes that all brokers are 
> shutting down?
> The only place in the Kafka code (0.11.0.2) we found in which the shutting 
> down broker set is changed is in the class _kafka.controller.KafkaControler_ 
> in line 1407 in the method _doControlledShutdown_.
>  
> {code:java}
> info("Shutting down broker " + id)
> if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
>   throw new BrokerNotAvailableException("Broker id %d does not 
> exist.".format(id))
> controllerContext.shuttingDownBrokerIds.add(id)
> {code}
> However, we should see the log entry "Shutting down broker n" for all Brokers 
> in the log file, but it is not there.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6714) KafkaController marks all Brokers as "Shutting down", though only one broker has been shut down

2018-03-26 Thread Uwe Eisele (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Uwe Eisele updated KAFKA-6714:
--
Description: 
In our Kafka cluster we experienced a situation in wich the Kafka controller 
has all Brokers marked as "Shutting down", though indeed only one Broker has 
been shut down.

The last log entry about the broker state before the entry that states that all 
brokers are shutting down states that no brokers are shutting down.

The consequence of this weird state is, that the Kafka controller is not able 
to elect any partition leader.
{code:java}
kafka.controller Log (Level TRACE):
[2018-03-15 16:28:24,288] INFO [Controller 5]: Shutting down broker 5 
(kafka.controller.KafkaController)
[2018-03-15 16:28:24,288] DEBUG [Controller 5]: All shutting down brokers: 5 
(kafka.controller.KafkaController)
[2018-03-15 16:28:24,288] DEBUG [Controller 5]: Live brokers: 1,2,3,4 
(kafka.controller.KafkaController)
...
[2018-03-15 16:28:36,846] INFO [Controller 3]: Currently active brokers in the 
cluster: Set(1, 2, 3, 4) (kafka.controller.KafkaController)
[2018-03-15 16:28:36,846] INFO [Controller 3]: Currently shutting brokers in 
the cluster: Set() (kafka.controller.KafkaController)
...
[2018-03-19 17:57:22,273] INFO [Controller 3]: Shutting down broker 1 
(kafka.controller.KafkaController)
[2018-03-19 17:57:22,273] DEBUG [Controller 3]: All shutting down brokers: 
1,5,2,3,4 (kafka.controller.KafkaController)
[2018-03-19 17:57:22,273] DEBUG [Controller 3]: Live brokers:  
(kafka.controller.KafkaController)
{code}
{code:java}
state.change.logger Log (Level TRACE):
[2018-03-19 17:57:22,275] ERROR Controller 3 epoch 83 encountered error while 
electing leader for partition 
[zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] due to: No other 
replicas in ISR 1,3,5 for 
[zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] besides shutting 
down brokers 1,5,2,3,4. (state.change.logger) {code}
The question is why the Kafka controller assumes that all brokers are shutting 
down?

The only place in the Kafka code (0.11.0.2) we found in which the shutting down 
broker set is changed is in the class _kafka.controller.KafkaControler_ in line 
1407 in the method _doControlledShutdown_.
{code:java}
info("Shutting down broker " + id)

if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
  throw new BrokerNotAvailableException("Broker id %d does not 
exist.".format(id))

controllerContext.shuttingDownBrokerIds.add(id)
{code}
However, we should see the log entry "Shutting down broker n" for all Brokers 
in the log file, but it is not there.

  was:
In our Kafka cluster we experienced a situation in wich the Kafka controller 
has all Brokers marked as "Shutting down", though indeed only one Broker has 
been shut down.

The last log entry about the broker state before the entry that states that all 
brokers are shutting down states that no brokers are shutting down.

The consequence of this weird state is, that the Kafka controller is not able 
to elect any partition leader.
{code:java}
[2018-03-15 16:28:24,288] INFO [Controller 5]: Shutting down broker 5 
(kafka.controller.KafkaController)
[2018-03-15 16:28:24,288] DEBUG [Controller 5]: All shutting down brokers: 5 
(kafka.controller.KafkaController)
[2018-03-15 16:28:24,288] DEBUG [Controller 5]: Live brokers: 1,2,3,4 
(kafka.controller.KafkaController)
...
[2018-03-15 16:28:36,846] INFO [Controller 3]: Currently active brokers in the 
cluster: Set(1, 2, 3, 4) (kafka.controller.KafkaController)
[2018-03-15 16:28:36,846] INFO [Controller 3]: Currently shutting brokers in 
the cluster: Set() (kafka.controller.KafkaController)
...
[2018-03-19 17:57:22,273] INFO [Controller 3]: Shutting down broker 1 
(kafka.controller.KafkaController)
[2018-03-19 17:57:22,273] DEBUG [Controller 3]: All shutting down brokers: 
1,5,2,3,4 (kafka.controller.KafkaController)
[2018-03-19 17:57:22,273] DEBUG [Controller 3]: Live brokers:  
(kafka.controller.KafkaController)
...
[2018-03-19 17:57:22,275] ERROR Controller 3 epoch 83 encountered error while 
electing leader for partition 
[zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] due to: No other 
replicas in ISR 1,3,5 for 
[zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] besides shutting 
down brokers 1,5,2,3,4. (state.change.logger) {code}
The question is why the Kafka controller assumes that all brokers are shutting 
down?

The only place in the Kafka code (0.11.0.2) we found in which the shutting down 
broker set is changed is in the class _kafka.controller.KafkaControler_ in line 
1407 in the method _doControlledShutdown_.
{code:java}
info("Shutting down broker " + id)

if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
  throw new BrokerNotAvailableException("Broker id %d does not 
exist.".format(id))

controllerContext.shuttingDownBrokerIds.add(id)
{code}
However, we should see the log entry 

[jira] [Commented] (KAFKA-6712) Throw a specific exception with wrong topic name for interactive queries

2018-03-26 Thread Francesco Guardiani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16413518#comment-16413518
 ] 

Francesco Guardiani commented on KAFKA-6712:


Yep

> Throw a specific exception with wrong topic name for interactive queries
> 
>
> Key: KAFKA-6712
> URL: https://issues.apache.org/jira/browse/KAFKA-6712
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Francesco Guardiani
>Priority: Major
>
> When you use the interactive queries state stores with a wrong topic name and 
> you call the store() method, the client should throw an exception that 
> explains that you have specified a wrong topic name. Now it throws an 
> IllegalStateStoreException "the state store may have migrated to another 
> instance." that is too generic and it's used also when the stream thread is 
> not ready.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6681) Two instances of kafka consumer reading the same partition within a consumer group

2018-03-26 Thread Narayan Periwal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16413500#comment-16413500
 ] 

Narayan Periwal commented on KAFKA-6681:


[~yuzhih...@gmail.com], any update on this?

> Two instances of kafka consumer reading the same partition within a consumer 
> group
> --
>
> Key: KAFKA-6681
> URL: https://issues.apache.org/jira/browse/KAFKA-6681
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.1
>Reporter: Narayan Periwal
>Priority: Critical
> Attachments: server-1.log, server-2.log
>
>
> We have seen this issue with the Kafka consumer, the new library that got 
> introduced in 0.9
> With this new client, the group management is done by kafka coordinator, 
> which is one of the kafka broker.
> We are using Kafka broker 0.10.2.1 and consumer client version is also 
> 0.10.2.1 
> The issue that we have faced is that, after rebalancing, some of the 
> partitions gets consumed by 2 instances within a consumer group, leading to 
> duplication of the entire partition data. Both the instances continue to read 
> until the next rebalancing, or the restart of those clients. 
> It looks like that a particular consumer goes on fetching the data from a 
> partition, but the broker is not able to identify this "stale" consumer 
> instance. 
> During this time, we also see the underreplicated partition metrics spiking. 
> We have hit this twice in production. Please look at it the earliest. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)