[jira] [Created] (KAFKA-7144) Kafka Streams doesn't properly balance partition assignment

2018-07-09 Thread James Cheng (JIRA)
James Cheng created KAFKA-7144:
--

 Summary: Kafka Streams doesn't properly balance partition 
assignment
 Key: KAFKA-7144
 URL: https://issues.apache.org/jira/browse/KAFKA-7144
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.1.0
Reporter: James Cheng
 Attachments: OneThenTwelve.java

Kafka Streams doesn't always spread the tasks across all available 
instances/threads

I have a topology which consumes a single partition topic and goes .through() a 
12 partition topic. The makes 13 partitions.

 

I then started 2 instances of the application. I would have expected the 13 
partitions to be split across the 2 instances roughly evenly (7 partitions on 
one, 6 partitions on the other).

Instead, one instance gets 12 partitions, and the other instance gets 1 
partition.

 

Repro case attached. I ran it a couple times, and it was fairly repeatable.

Setup for the repro:
{code:java}
$ ./bin/kafka-topics.sh --zookeeper localhost --create --topic one --partitions 
1 --replication-factor 1 
$ ./bin/kafka-topics.sh --zookeeper localhost --create --topic twelve 
--partitions 12 --replication-factor 1
$ echo foo | kafkacat -P -b 127.0.0.1 -t one
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7143) Cannot use KafkaConsumer with Kotlin coroutines due to Thread id check

2018-07-09 Thread Raman Gupta (JIRA)
Raman Gupta created KAFKA-7143:
--

 Summary: Cannot use KafkaConsumer with Kotlin coroutines due to 
Thread id check
 Key: KAFKA-7143
 URL: https://issues.apache.org/jira/browse/KAFKA-7143
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 1.1.0
Reporter: Raman Gupta


I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin 
[coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which 
supports a style of async programming that avoids the need for callbacks (and 
existing callback-based API's such as Kafka's can easily be adapted to this). 
With coroutines, methods with callbacks are suspended, and resumed once the 
call is complete. With this approach, while access to the KafkaConsumer is done 
in a thread-safe way, it does NOT happen from a single thread -- a different 
underlying thread may actually execute the code after the suspension point.

However, the KafkaConsumer includes additional checks to verify not only the 
thread safety of the client, but that the *same thread* is being used -- if the 
same thread (by id) is not being used the consumer throws an exception like:

{code}
Exception in thread "ForkJoinPool.commonPool-worker-25" 
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
{code}

I understand this check is present to protect people from themselves, but I'd 
like the ability to disable this check so that this code can be used 
effectively by libraries such as Kotlin coroutines.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6999) Document read-write lock usage of caching enabled stores

2018-07-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537967#comment-16537967
 ] 

ASF GitHub Bot commented on KAFKA-6999:
---

dongjinleekr opened a new pull request #5351: KAFKA-6999: Document read-write 
lock usage of caching enabled stores
URL: https://github.com/apache/kafka/pull/5351
 
 
   Add description for the deadlock vulnerability of `ReadOnlyKeyValueStore`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Document read-write lock usage of caching enabled stores
> 
>
> Key: KAFKA-6999
> URL: https://issues.apache.org/jira/browse/KAFKA-6999
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>
> From the mailing list
> {quote}Hello again fellow Kafkans,
>  
> Yesterday we observed a production deadlock take down one of our instances. 
> Upon digging, it's clear that our usage of Kafka is the proximate cause, but 
> the danger of our approach is not clear at all just from the Javadocs.
>  
> We have stream processors that read off an incoming KStream, possibly 
> cross-reference some data from an auxiliary table via 
> ReadOnlyKeyValueStore.get()
>  
> This is done via custom logic rather than a direct KTable join because which 
> index is consulted may change depending on the shape of incoming data.
>  
> The ROKVS docs state,
>  * A key value store that only supports read operations.
>  * Implementations should be thread-safe as concurrent reads and writes
>  * are expected.
>  
> They do **not** indicate that the CachingKVS layer uses a ReadWriteLock. If 
> you have one CachingKVS flush a record cause a read from another CKVS, you 
> are suddenly vulnerable to classic lock order reversals! Surprise!
>  
> A partial stack trace highlighting the problem, with many uninteresting 
> frames removed, is inline at the bottom of this mail.
>  
> You could probably rightly point to us allowing a "observer" pattern to 
> callback from within streams processing code is dangerous. We might move this 
> off to an auxiliary thread to alleviate this problem.
>  
> But it still remains -- when you go an read that ROKVS documentation, it sure 
> doesn't prepare you to this possibility!
>  {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-4693) Consumer subscription change during rebalance causes exception

2018-07-09 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-4693:
--
Comment: was deleted

(was: Hi [~hachikuji] Is the validation performed when the request future 
succeeds (i.e. something that occurs after {{ConsumerRebalanceListener}}'s 
method is called) ? Because when I first ran through the code, there does not 
appear to be any place where there looks to be such a validation. (I was 
looking in the proximity of {{ConsumerCoordinator#onJoinComplete()}}))

> Consumer subscription change during rebalance causes exception
> --
>
> Key: KAFKA-4693
> URL: https://issues.apache.org/jira/browse/KAFKA-4693
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Jason Gustafson
>Priority: Minor
> Fix For: 2.1.0
>
>
> After every rebalance, the consumer validates that the assignment received 
> contains only partitions from topics that were subscribed. If not, then we 
> raise an exception to the user. It is possible for a wakeup or an interrupt 
> to leave the consumer with a rebalance in progress (e.g. with a JoinGroup to 
> the coordinator in-flight). If the user then changes the topic subscription, 
> then this validation upon completion of the rebalance will fail. We should 
> probably detect the subscription change, eat the exception, and request 
> another rebalance. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds

2018-07-09 Thread Ying Zheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Zheng updated KAFKA-7142:
--
Priority: Major  (was: Minor)

> Rebalancing large consumer group can block the coordinator broker for several 
> seconds
> -
>
> Key: KAFKA-7142
> URL: https://issues.apache.org/jira/browse/KAFKA-7142
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.10.2.0, 0.11.0.0, 1.0.0, 
> 1.1.0
>Reporter: Ying Zheng
>Assignee: Ying Zheng
>Priority: Major
>
> In our production cluster, we noticed that when a large consumer group (a few 
> thousand members) is rebalancing, the produce latency of the coordinator 
> broker can jump to several seconds.
>  
> Group rebalance is a very frequent operation, it can be triggered by adding / 
> removing / restarting a single member in the consumer group.
>  
> When this happens, jstack shows all the request handler threads of the broker 
> are waiting for group lock:
> {noformat}
> "kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x7f9a32b16000 
> nid=0x1b985 waiting on condition [0x7f98f1adb000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00024aa73b20> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
> at 
> kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
> at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> at java.lang.Thread.run(Thread.java:745){noformat}
>   
> Besides one thread that is either doing GroupMetadata.supportsProtocols():
> {noformat}
> "kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x7f9a32b14000 
> nid=0x1b984 runnable [0x7f98f1bdc000]
>java.lang.Thread.State: RUNNABLE
> at scala.collection.immutable.List.map(List.scala:284)
> at 
> kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68)
> at 
> kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
> at 
> kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
> at scala.collection.immutable.List.map(List.scala:288)
> at 
> kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265)
> at 
> kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270)
> at 
> kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
> at 
> kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
> at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> at java.lang.Thread.run(Thread.java:745){noformat}
> or GroupCoordinator.tryCompleteJoin
> {noformat}
> "kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x7fe9f6ad1000 
> nid=0x1ceff runnable [0x7fe8701ca000]
>java.lang.Thread.State: RUNNABLE
> at 
> scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
> at 
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
> at 
> 

[jira] [Updated] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds

2018-07-09 Thread Ying Zheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Zheng updated KAFKA-7142:
--
Description: 
In our production cluster, we noticed that when a large consumer group (a few 
thousand members) is rebalancing, the produce latency of the coordinator broker 
can jump to several seconds.

 

Group rebalance is a very frequent operation, it can be triggered by adding / 
removing / restarting a single member in the consumer group.

 

When this happens, jstack shows all the request handler threads of the broker 
are waiting for group lock:
{noformat}
"kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x7f9a32b16000 
nid=0x1b985 waiting on condition [0x7f98f1adb000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00024aa73b20> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
at 
kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
at 
kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.lang.Thread.run(Thread.java:745){noformat}
  

Besides one thread that is either doing GroupMetadata.supportsProtocols():
{noformat}
"kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x7f9a32b14000 
nid=0x1b984 runnable [0x7f98f1bdc000]
   java.lang.Thread.State: RUNNABLE
at scala.collection.immutable.List.map(List.scala:284)
at 
kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68)
at 
kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
at 
kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
at scala.collection.immutable.List.map(List.scala:288)
at 
kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265)
at 
kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270)
at 
kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
at 
kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
at 
kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.lang.Thread.run(Thread.java:745){noformat}
or GroupCoordinator.tryCompleteJoin
{noformat}
"kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x7fe9f6ad1000 
nid=0x1ceff runnable [0x7fe8701ca000]
   java.lang.Thread.State: RUNNABLE
at 
scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
at 
scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
at 
scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:139)
at 
scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
at 
scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
at 
kafka.coordinator.group.GroupMetadata.notYetRejoinedMembers(GroupMetadata.scala:229)
at 

[jira] [Updated] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds

2018-07-09 Thread Ying Zheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Zheng updated KAFKA-7142:
--
Affects Version/s: (was: 0.10.2.0)
   0.9.0.0
   0.10.0.0
   0.11.0.0
   1.0.0

> Rebalancing large consumer group can block the coordinator broker for several 
> seconds
> -
>
> Key: KAFKA-7142
> URL: https://issues.apache.org/jira/browse/KAFKA-7142
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.10.2.0, 0.11.0.0, 1.0.0, 
> 1.1.0
>Reporter: Ying Zheng
>Assignee: Ying Zheng
>Priority: Minor
>
> In our production cluster, we noticed that when a large consumer group (a few 
> thousand members) is rebalancing, the produce latency of the coordinator 
> broker can jump to several seconds.
>  
> When this happens, jstack shows all the request handler threads of the broker 
> are waiting for group lock:
> {noformat}
> "kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x7f9a32b16000 
> nid=0x1b985 waiting on condition [0x7f98f1adb000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00024aa73b20> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
> at 
> kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
> at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> at java.lang.Thread.run(Thread.java:745){noformat}
>   
> Besides one thread that is either doing GroupMetadata.supportsProtocols():
> {noformat}
> "kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x7f9a32b14000 
> nid=0x1b984 runnable [0x7f98f1bdc000]
>java.lang.Thread.State: RUNNABLE
> at scala.collection.immutable.List.map(List.scala:284)
> at 
> kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68)
> at 
> kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
> at 
> kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
> at scala.collection.immutable.List.map(List.scala:288)
> at 
> kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265)
> at 
> kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270)
> at 
> kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
> at 
> kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
> at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> at java.lang.Thread.run(Thread.java:745){noformat}
> or GroupCoordinator.tryCompleteJoin
> {noformat}
> "kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x7fe9f6ad1000 
> nid=0x1ceff runnable [0x7fe8701ca000]
>java.lang.Thread.State: RUNNABLE
> at 
> scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
> at 
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
> at 
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)

[jira] [Updated] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds

2018-07-09 Thread Ying Zheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Zheng updated KAFKA-7142:
--
Affects Version/s: 0.10.1.0
   0.10.2.0

> Rebalancing large consumer group can block the coordinator broker for several 
> seconds
> -
>
> Key: KAFKA-7142
> URL: https://issues.apache.org/jira/browse/KAFKA-7142
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.10.2.0, 0.11.0.0, 1.0.0, 
> 1.1.0
>Reporter: Ying Zheng
>Assignee: Ying Zheng
>Priority: Minor
>
> In our production cluster, we noticed that when a large consumer group (a few 
> thousand members) is rebalancing, the produce latency of the coordinator 
> broker can jump to several seconds.
>  
> When this happens, jstack shows all the request handler threads of the broker 
> are waiting for group lock:
> {noformat}
> "kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x7f9a32b16000 
> nid=0x1b985 waiting on condition [0x7f98f1adb000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00024aa73b20> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
> at 
> kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
> at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> at java.lang.Thread.run(Thread.java:745){noformat}
>   
> Besides one thread that is either doing GroupMetadata.supportsProtocols():
> {noformat}
> "kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x7f9a32b14000 
> nid=0x1b984 runnable [0x7f98f1bdc000]
>java.lang.Thread.State: RUNNABLE
> at scala.collection.immutable.List.map(List.scala:284)
> at 
> kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68)
> at 
> kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
> at 
> kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
> at scala.collection.immutable.List.map(List.scala:288)
> at 
> kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265)
> at 
> kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270)
> at 
> kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
> at 
> kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
> at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> at java.lang.Thread.run(Thread.java:745){noformat}
> or GroupCoordinator.tryCompleteJoin
> {noformat}
> "kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x7fe9f6ad1000 
> nid=0x1ceff runnable [0x7fe8701ca000]
>java.lang.Thread.State: RUNNABLE
> at 
> scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
> at 
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
> at 
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
> at 

[jira] [Updated] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds

2018-07-09 Thread Ying Zheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Zheng updated KAFKA-7142:
--
Affects Version/s: 1.1.0

> Rebalancing large consumer group can block the coordinator broker for several 
> seconds
> -
>
> Key: KAFKA-7142
> URL: https://issues.apache.org/jira/browse/KAFKA-7142
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.2.0, 1.1.0
>Reporter: Ying Zheng
>Assignee: Ying Zheng
>Priority: Minor
>
> In our production cluster, we noticed that when a large consumer group (a few 
> thousand members) is rebalancing, the produce latency of the coordinator 
> broker can jump to several seconds.
>  
> When this happens, jstack shows all the request handler threads of the broker 
> are waiting for group lock:
> {noformat}
> "kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x7f9a32b16000 
> nid=0x1b985 waiting on condition [0x7f98f1adb000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00024aa73b20> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
> at 
> kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
> at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> at java.lang.Thread.run(Thread.java:745){noformat}
>   
> Besides one thread that is either doing GroupMetadata.supportsProtocols():
> {noformat}
> "kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x7f9a32b14000 
> nid=0x1b984 runnable [0x7f98f1bdc000]
>java.lang.Thread.State: RUNNABLE
> at scala.collection.immutable.List.map(List.scala:284)
> at 
> kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68)
> at 
> kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
> at 
> kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
> at scala.collection.immutable.List.map(List.scala:288)
> at 
> kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265)
> at 
> kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270)
> at 
> kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
> at 
> kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
> at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> at java.lang.Thread.run(Thread.java:745){noformat}
> or GroupCoordinator.tryCompleteJoin
> {noformat}
> "kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x7fe9f6ad1000 
> nid=0x1ceff runnable [0x7fe8701ca000]
>java.lang.Thread.State: RUNNABLE
> at 
> scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
> at 
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
> at 
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at 

[jira] [Updated] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds

2018-07-09 Thread Ying Zheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Zheng updated KAFKA-7142:
--
Affects Version/s: 0.10.2.0

> Rebalancing large consumer group can block the coordinator broker for several 
> seconds
> -
>
> Key: KAFKA-7142
> URL: https://issues.apache.org/jira/browse/KAFKA-7142
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.2.0
>Reporter: Ying Zheng
>Assignee: Ying Zheng
>Priority: Minor
>
> In our production cluster, we noticed that when a large consumer group (a few 
> thousand members) is rebalancing, the produce latency of the coordinator 
> broker can jump to several seconds.
>  
> When this happens, jstack shows all the request handler threads of the broker 
> are waiting for group lock:
> {noformat}
> "kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x7f9a32b16000 
> nid=0x1b985 waiting on condition [0x7f98f1adb000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00024aa73b20> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
> at 
> kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
> at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> at java.lang.Thread.run(Thread.java:745){noformat}
>   
> Besides one thread that is either doing GroupMetadata.supportsProtocols():
> {noformat}
> "kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x7f9a32b14000 
> nid=0x1b984 runnable [0x7f98f1bdc000]
>java.lang.Thread.State: RUNNABLE
> at scala.collection.immutable.List.map(List.scala:284)
> at 
> kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68)
> at 
> kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
> at 
> kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
> at scala.collection.immutable.List.map(List.scala:288)
> at 
> kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265)
> at 
> kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270)
> at 
> kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
> at 
> kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
> at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> at java.lang.Thread.run(Thread.java:745){noformat}
> or GroupCoordinator.tryCompleteJoin
> {noformat}
> "kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x7fe9f6ad1000 
> nid=0x1ceff runnable [0x7fe8701ca000]
>java.lang.Thread.State: RUNNABLE
> at 
> scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
> at 
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
> at 
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at 

[jira] [Assigned] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds

2018-07-09 Thread Ying Zheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Zheng reassigned KAFKA-7142:
-

Assignee: Ying Zheng

> Rebalancing large consumer group can block the coordinator broker for several 
> seconds
> -
>
> Key: KAFKA-7142
> URL: https://issues.apache.org/jira/browse/KAFKA-7142
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.2.0
>Reporter: Ying Zheng
>Assignee: Ying Zheng
>Priority: Minor
>
> In our production cluster, we noticed that when a large consumer group (a few 
> thousand members) is rebalancing, the produce latency of the coordinator 
> broker can jump to several seconds.
>  
> When this happens, jstack shows all the request handler threads of the broker 
> are waiting for group lock:
> {noformat}
> "kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x7f9a32b16000 
> nid=0x1b985 waiting on condition [0x7f98f1adb000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00024aa73b20> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
> at 
> kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
> at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> at java.lang.Thread.run(Thread.java:745){noformat}
>   
> Besides one thread that is either doing GroupMetadata.supportsProtocols():
> {noformat}
> "kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x7f9a32b14000 
> nid=0x1b984 runnable [0x7f98f1bdc000]
>java.lang.Thread.State: RUNNABLE
> at scala.collection.immutable.List.map(List.scala:284)
> at 
> kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68)
> at 
> kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
> at 
> kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
> at scala.collection.immutable.List.map(List.scala:288)
> at 
> kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265)
> at 
> kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270)
> at 
> kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
> at 
> kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
> at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> at java.lang.Thread.run(Thread.java:745){noformat}
> or GroupCoordinator.tryCompleteJoin
> {noformat}
> "kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x7fe9f6ad1000 
> nid=0x1ceff runnable [0x7fe8701ca000]
>java.lang.Thread.State: RUNNABLE
> at 
> scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
> at 
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
> at 
> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at 

[jira] [Updated] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds

2018-07-09 Thread Ying Zheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Zheng updated KAFKA-7142:
--
Description: 
In our production cluster, we noticed that when a large consumer group (a few 
thousand members) is rebalancing, the produce latency of the coordinator broker 
can jump to several seconds.

 

When this happens, jstack shows all the request handler threads of the broker 
are waiting for group lock:
{noformat}
"kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x7f9a32b16000 
nid=0x1b985 waiting on condition [0x7f98f1adb000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00024aa73b20> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
at 
kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
at 
kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.lang.Thread.run(Thread.java:745){noformat}
  

Besides one thread that is either doing GroupMetadata.supportsProtocols():
{noformat}
"kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x7f9a32b14000 
nid=0x1b984 runnable [0x7f98f1bdc000]
   java.lang.Thread.State: RUNNABLE
at scala.collection.immutable.List.map(List.scala:284)
at 
kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68)
at 
kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
at 
kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265)
at scala.collection.immutable.List.map(List.scala:288)
at 
kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265)
at 
kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270)
at 
kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
at 
kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152)
at 
kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137)
at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241)
at kafka.server.KafkaApis.handle(KafkaApis.scala:115)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.lang.Thread.run(Thread.java:745){noformat}
or GroupCoordinator.tryCompleteJoin
{noformat}
"kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x7fe9f6ad1000 
nid=0x1ceff runnable [0x7fe8701ca000]
   java.lang.Thread.State: RUNNABLE
at 
scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
at 
scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
at 
scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:139)
at 
scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
at 
scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
at 
kafka.coordinator.group.GroupMetadata.notYetRejoinedMembers(GroupMetadata.scala:229)
at 
kafka.coordinator.group.GroupCoordinator$$anonfun$tryCompleteJoin$1.apply$mcZ$sp(GroupCoordinator.scala:767)
at 
kafka.coordinator.group.GroupCoordinator$$anonfun$tryCompleteJoin$1.apply(GroupCoordinator.scala:767)
at 

[jira] [Updated] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds

2018-07-09 Thread Ying Zheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Zheng updated KAFKA-7142:
--
Priority: Minor  (was: Major)

> Rebalancing large consumer group can block the coordinator broker for several 
> seconds
> -
>
> Key: KAFKA-7142
> URL: https://issues.apache.org/jira/browse/KAFKA-7142
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ying Zheng
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds

2018-07-09 Thread Ying Zheng (JIRA)
Ying Zheng created KAFKA-7142:
-

 Summary: Rebalancing large consumer group can block the 
coordinator broker for several seconds
 Key: KAFKA-7142
 URL: https://issues.apache.org/jira/browse/KAFKA-7142
 Project: Kafka
  Issue Type: Improvement
Reporter: Ying Zheng






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7141) kafka-consumer-group doesn't describe existing group

2018-07-09 Thread Vahid Hashemian (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537665#comment-16537665
 ] 

Vahid Hashemian commented on KAFKA-7141:


[~kioria] thanks for reporting the issue. Do you see that output after manually 
committing offsets? Or before? It would be great help if you could provide the 
steps that reproduces the issue. Thanks! 

> kafka-consumer-group doesn't describe existing group
> 
>
> Key: KAFKA-7141
> URL: https://issues.apache.org/jira/browse/KAFKA-7141
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.11.0.0, 1.0.1
>Reporter: Bohdana Panchenko
>Priority: Major
>
> I am running two consumers: akka-stream-kafka consumer with standard config 
> section as described in the 
> [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
> kafka-console-consumer.
> akka-stream-kafka consumer configuration looks like this
> {color:#33}_akka.kafka.consumer{_{color}
> {color:#33}  _kafka-clients{_{color}
> {color:#33}    _group.id = "myakkastreamkafka-1"_{color}
> {color:#33}   _enable.auto.commit = false_{color}
> }
> {color:#33} }{color}
>  
>  I am able to see the both groups with the command
>  
>  *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
>  _Note: This will not show information about old Zookeeper-based consumers._
>  
>  _myakkastreamkafka-1_
>  _console-consumer-57171_
> {color:#33}I am able to view details about the console consumer 
> group{color}
> *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
> console-consumer-57171*
>  _{color:#205081}Note: This will not show information about old 
> Zookeeper-based consumers.{color}_
> _{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
> HOST CLIENT-ID{color}_
>  _{color:#205081}STREAM-TEST 0 0 0 0 
> consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_
> {color:#33}But the command to describe my akka stream consumer gives me 
> empty output:{color}
> *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
> myakkastreamkafka-1*
>  {color:#205081}_Note: This will not show information about old 
> Zookeeper-based consumers._{color}
> {color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
> HOST CLIENT-ID_{color}
>  
> {color:#33}That is strange. Can you please check the issue?{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4740) Using new consumer API with a Deserializer that throws SerializationException can lead to infinite loop

2018-07-09 Thread Raman Gupta (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537274#comment-16537274
 ] 

Raman Gupta commented on KAFKA-4740:


I'm still a little confused about how this should work though: the position 
requires a partition as an argument, but how can I get that information? I only 
see that the consumer knows the *set* of partitions assigned to it, not the 
partition that contains the message which failed to deserialize.

> Using new consumer API with a Deserializer that throws SerializationException 
> can lead to infinite loop
> ---
>
> Key: KAFKA-4740
> URL: https://issues.apache.org/jira/browse/KAFKA-4740
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
> Environment: Kafka broker 0.10.1.1 (but this bug is not dependent on 
> the broker version)
> Kafka clients 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
>Reporter: Sébastien Launay
>Assignee: Sébastien Launay
>Priority: Critical
>
> The old consumer supports deserializing records into typed objects and throws 
> a {{SerializationException}} through {{MessageAndMetadata#key()}} and 
> {{MessageAndMetadata#message()}} that can be catched by the client \[1\].
> When using the new consumer API with kafka-clients version < 0.10.0.1, such 
> the exception is swallowed by the {{NetworkClient}} class and result in an 
> infinite loop which the client has no control over like:
> {noformat}
> DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset 
> for partition test2-0 to earliest offset.
> DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetched offset 0 
> for partition test2-0
> ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request 
> completion:
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by IntegerDeserializer is not 4
> ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request 
> completion:
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by IntegerDeserializer is not 4
> ...
> {noformat}
> Thanks to KAFKA-3977, this has been partially fixed in 0.10.1.0 but another 
> issue still remains.
> Indeed, the client can now catch the {{SerializationException}} but the next 
> call to {{Consumer#poll(long)}} will throw the same exception indefinitely.
> The following snippet (full example available on Github \[2\] for most 
> released kafka-clients versions):
> {code:java}
> try (KafkaConsumer kafkaConsumer = new 
> KafkaConsumer<>(consumerConfig, new StringDeserializer(), new 
> IntegerDeserializer())) {
> kafkaConsumer.subscribe(Arrays.asList("topic"));
> // Will run till the shutdown hook is called
> while (!doStop) {
> try {
> ConsumerRecords records = 
> kafkaConsumer.poll(1000);
> if (!records.isEmpty()) {
> logger.info("Got {} messages", records.count());
> for (ConsumerRecord record : records) {
> logger.info("Message with partition: {}, offset: {}, key: 
> {}, value: {}",
> record.partition(), record.offset(), record.key(), 
> record.value());
> }
> } else {
> logger.info("No messages to consume");
> }
> } catch (SerializationException e) {
> logger.warn("Failed polling some records", e);
> }
>  }
> }
> {code}
> when run with the following records (third record has an invalid Integer 
> value):
> {noformat}
> printf "\x00\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> printf "\x00\x00\x00\x01\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> printf "\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> printf "\x00\x00\x00\x02\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> {noformat}
> will produce the following logs:
> {noformat}
> INFO  consumer.Consumer - Got 2 messages
> INFO  consumer.Consumer - Message with partition: 0, offset: 0, key: null, 
> value: 0
> INFO  consumer.Consumer - Message with partition: 0, offset: 1, key: null, 
> value: 1
> WARN  consumer.Consumer - Failed polling some records
> org.apache.kafka.common.errors.SerializationException: Error deserializing 
> key/value for partition topic-0 at offset 2
> Caused by: org.apache.kafka.common.errors.SerializationException: Size of 
> data received by IntegerDeserializer is not 4
> WARN  consumer.Consumer - Failed polling some records
> 

[jira] [Commented] (KAFKA-4740) Using new consumer API with a Deserializer that throws SerializationException can lead to infinite loop

2018-07-09 Thread Raman Gupta (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537264#comment-16537264
 ] 

Raman Gupta commented on KAFKA-4740:


Reading through the other linked issues and github pull requests, it *seems* 
like the intended behavior now, if client code wishes to seek past a bad 
message proactively, is for the client code to use the `position` and `seek` 
methods on the underlying KafkaConsumer when this exception occurs. While the 
exception itself does not contain the offset information, the position of the 
consumer should be correct (see [this 
comment|https://issues.apache.org/jira/browse/KAFKA-5211?focusedCommentId=16024066=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16024066]
 from [~hachikuji]).

> Using new consumer API with a Deserializer that throws SerializationException 
> can lead to infinite loop
> ---
>
> Key: KAFKA-4740
> URL: https://issues.apache.org/jira/browse/KAFKA-4740
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
> Environment: Kafka broker 0.10.1.1 (but this bug is not dependent on 
> the broker version)
> Kafka clients 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
>Reporter: Sébastien Launay
>Assignee: Sébastien Launay
>Priority: Critical
>
> The old consumer supports deserializing records into typed objects and throws 
> a {{SerializationException}} through {{MessageAndMetadata#key()}} and 
> {{MessageAndMetadata#message()}} that can be catched by the client \[1\].
> When using the new consumer API with kafka-clients version < 0.10.0.1, such 
> the exception is swallowed by the {{NetworkClient}} class and result in an 
> infinite loop which the client has no control over like:
> {noformat}
> DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset 
> for partition test2-0 to earliest offset.
> DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetched offset 0 
> for partition test2-0
> ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request 
> completion:
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by IntegerDeserializer is not 4
> ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request 
> completion:
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by IntegerDeserializer is not 4
> ...
> {noformat}
> Thanks to KAFKA-3977, this has been partially fixed in 0.10.1.0 but another 
> issue still remains.
> Indeed, the client can now catch the {{SerializationException}} but the next 
> call to {{Consumer#poll(long)}} will throw the same exception indefinitely.
> The following snippet (full example available on Github \[2\] for most 
> released kafka-clients versions):
> {code:java}
> try (KafkaConsumer kafkaConsumer = new 
> KafkaConsumer<>(consumerConfig, new StringDeserializer(), new 
> IntegerDeserializer())) {
> kafkaConsumer.subscribe(Arrays.asList("topic"));
> // Will run till the shutdown hook is called
> while (!doStop) {
> try {
> ConsumerRecords records = 
> kafkaConsumer.poll(1000);
> if (!records.isEmpty()) {
> logger.info("Got {} messages", records.count());
> for (ConsumerRecord record : records) {
> logger.info("Message with partition: {}, offset: {}, key: 
> {}, value: {}",
> record.partition(), record.offset(), record.key(), 
> record.value());
> }
> } else {
> logger.info("No messages to consume");
> }
> } catch (SerializationException e) {
> logger.warn("Failed polling some records", e);
> }
>  }
> }
> {code}
> when run with the following records (third record has an invalid Integer 
> value):
> {noformat}
> printf "\x00\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> printf "\x00\x00\x00\x01\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> printf "\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> printf "\x00\x00\x00\x02\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> {noformat}
> will produce the following logs:
> {noformat}
> INFO  consumer.Consumer - Got 2 messages
> INFO  consumer.Consumer - Message with partition: 0, offset: 0, key: null, 
> value: 0
> INFO  consumer.Consumer - Message with partition: 0, offset: 1, key: null, 
> value: 1
> WARN  consumer.Consumer - Failed polling some records
> 

[jira] [Updated] (KAFKA-7141) kafka-consumer-group doesn't describe existing group

2018-07-09 Thread Bohdana Panchenko (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bohdana Panchenko updated KAFKA-7141:
-
Description: 
I am running two consumers: akka-stream-kafka consumer with standard config 
section as described in the 
[https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
kafka-console-consumer.

akka-stream-kafka consumer configuration looks like this

{color:#33}_akka.kafka.consumer{_{color}

{color:#33}  _kafka-clients{_{color}

{color:#33}    _group.id = "myakkastreamkafka-1"_{color}

{color:#33}   _enable.auto.commit = false_{color}

}
{color:#33} }{color}
 
 I am able to see the both groups with the command
 
 *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
 _Note: This will not show information about old Zookeeper-based consumers._
 
 _myakkastreamkafka-1_
 _console-consumer-57171_

{color:#33}I am able to view details about the console consumer group{color}

*kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
console-consumer-57171*
 _{color:#205081}Note: This will not show information about old Zookeeper-based 
consumers.{color}_

_{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
HOST CLIENT-ID{color}_
 _{color:#205081}STREAM-TEST 0 0 0 0 
consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_

{color:#33}But the command to describe my akka stream consumer gives me 
empty output:{color}

*kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
myakkastreamkafka-1*
 {color:#205081}_Note: This will not show information about old Zookeeper-based 
consumers._{color}

{color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
HOST CLIENT-ID_{color}

 

{color:#33}That is strange. Can you please check the issue?{color}

  was:
I am running two consumers: akka-stream-kafka consumer with standard config 
section as described in the 
[https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
kafka-console-consumer.

akka-stream-kafka consumer configuration looks like this

_{color:#14892c}akka.kafka.consumer{{color}_

  _{color:#14892c}kafka-clients{{color}_

    _{color:#14892c}group.id = "myakkastreamkafka-1"{color}_

   _{color:#14892c}enable.auto.commit = false{color}_

  _{color:#14892c}}{color}_

}

I am able to see the both groups with the command
 
 *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
 _Note: This will not show information about old Zookeeper-based consumers._
 
 _myakkastreamkafka-1_
 {color:#205081}_console-consumer-57171_{color}

{color:#33}I am able to view details about the console consumer group{color}

*kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
console-consumer-57171*
 _{color:#205081}Note: This will not show information about old Zookeeper-based 
consumers.{color}_

_{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
HOST CLIENT-ID{color}_
 _{color:#205081}STREAM-TEST 0 0 0 0 
consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_

{color:#33}But the command to describe my akka stream consumer gives me 
empty output:{color}

*kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
myakkastreamkafka-1*
 {color:#205081}_Note: This will not show information about old Zookeeper-based 
consumers._{color}

{color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
HOST CLIENT-ID_{color}

 

{color:#33}That is strange. Can you please check the issue?{color}


> kafka-consumer-group doesn't describe existing group
> 
>
> Key: KAFKA-7141
> URL: https://issues.apache.org/jira/browse/KAFKA-7141
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.11.0.0, 1.0.1
>Reporter: Bohdana Panchenko
>Priority: Major
>
> I am running two consumers: akka-stream-kafka consumer with standard config 
> section as described in the 
> [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
> kafka-console-consumer.
> akka-stream-kafka consumer configuration looks like this
> {color:#33}_akka.kafka.consumer{_{color}
> {color:#33}  _kafka-clients{_{color}
> {color:#33}    _group.id = "myakkastreamkafka-1"_{color}
> {color:#33}   _enable.auto.commit = false_{color}
> }
> {color:#33} }{color}
>  
>  I am able to see the both groups with the command
>  
>  *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
>  _Note: This will not show information about old Zookeeper-based consumers._
>  
>  _myakkastreamkafka-1_
>  _console-consumer-57171_
> {color:#33}I am able to view details about the console consumer 
> group{color}
> 

[jira] [Updated] (KAFKA-7141) kafka-consumer-group doesn't describe existing group

2018-07-09 Thread Bohdana Panchenko (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bohdana Panchenko updated KAFKA-7141:
-
Description: 
I am running two consumers: akka-stream-kafka consumer with standard config 
section as described in the 
[https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
kafka-console-consumer.

akka-stream-kafka consumer configuration looks like this

_{color:#14892c}akka.kafka.consumer{{color}_

  _{color:#14892c}kafka-clients{{color}_

    _{color:#14892c}group.id = "myakkastreamkafka-1"{color}_

   _{color:#14892c}enable.auto.commit = false{color}_

  _{color:#14892c}}{color}_

}

I am able to see the both groups with the command
 
 *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
 _Note: This will not show information about old Zookeeper-based consumers._
 
 _myakkastreamkafka-1_
 {color:#205081}_console-consumer-57171_{color}

{color:#33}I am able to view details about the console consumer group{color}

*kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
console-consumer-57171*
 _{color:#205081}Note: This will not show information about old Zookeeper-based 
consumers.{color}_

_{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
HOST CLIENT-ID{color}_
 _{color:#205081}STREAM-TEST 0 0 0 0 
consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_

{color:#33}But the command to describe my akka stream consumer gives me 
empty output:{color}

*kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
myakkastreamkafka-1*
 {color:#205081}_Note: This will not show information about old Zookeeper-based 
consumers._{color}

{color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
HOST CLIENT-ID_{color}

 

{color:#33}That is strange. Can you please check the issue?{color}

  was:
I am running two consumers: akka-stream-kafka consumer with standard config 
section as described in the 
[https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
kafka-console-consumer.

akka-stream-kafka consumer configuration looks like this

_{color:#14892c}kafka-clients {{color}_
 _{color:#14892c}group.id = "myakkastreamkafka-1"{color}_
 _{color:#14892c}enable.auto.commit = false{color}_
{color:#14892c}}{color}
 
 I am able to see the both groups with the command
 
 *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
 _Note: This will not show information about old Zookeeper-based consumers._

{color:#205081}_myakkastreamkafka-1_{color}
 {color:#205081}_console-consumer-57171_{color}

{color:#33}I am able to view details about the console consumer group{color}

*kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
console-consumer-57171*
 _{color:#205081}Note: This will not show information about old Zookeeper-based 
consumers.{color}_

_{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
HOST CLIENT-ID{color}_
 _{color:#205081}STREAM-TEST 0 0 0 0 
consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_

{color:#33}But the command to describe my akka stream consumer gives me 
empty output:{color}

*kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
myakkastreamkafka-1*
 {color:#205081}_Note: This will not show information about old Zookeeper-based 
consumers._{color}

{color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
HOST CLIENT-ID_{color}

 

{color:#33}That is strange. Can you please check the issue?{color}


> kafka-consumer-group doesn't describe existing group
> 
>
> Key: KAFKA-7141
> URL: https://issues.apache.org/jira/browse/KAFKA-7141
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.11.0.0, 1.0.1
>Reporter: Bohdana Panchenko
>Priority: Major
>
> I am running two consumers: akka-stream-kafka consumer with standard config 
> section as described in the 
> [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
> kafka-console-consumer.
> akka-stream-kafka consumer configuration looks like this
> _{color:#14892c}akka.kafka.consumer{{color}_
>   _{color:#14892c}kafka-clients{{color}_
>     _{color:#14892c}group.id = "myakkastreamkafka-1"{color}_
>    _{color:#14892c}enable.auto.commit = false{color}_
>   _{color:#14892c}}{color}_
> }
> I am able to see the both groups with the command
>  
>  *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
>  _Note: This will not show information about old Zookeeper-based consumers._
>  
>  _myakkastreamkafka-1_
>  {color:#205081}_console-consumer-57171_{color}
> {color:#33}I am able to view details about the console consumer 
> group{color}
> 

[jira] [Updated] (KAFKA-7141) kafka-consumer-group doesn't describe existing group

2018-07-09 Thread Bohdana Panchenko (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bohdana Panchenko updated KAFKA-7141:
-
Description: 
I am running two consumers: akka-stream-kafka consumer with standard config 
section as described in the 
[https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
kafka-console-consumer.

akka-stream-kafka consumer configuration looks like this

_{color:#14892c}kafka-clients {{color}_
 _{color:#14892c}group.id = "myakkastreamkafka-1"{color}_
 _{color:#14892c}enable.auto.commit = false{color}_
{color:#14892c}}{color}
 
 I am able to see the both groups with the command
 
 *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
 _Note: This will not show information about old Zookeeper-based consumers._

{color:#205081}_myakkastreamkafka-1_{color}
 {color:#205081}_console-consumer-57171_{color}

{color:#33}I am able to view details about the console consumer group{color}

*kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
console-consumer-57171*
 _{color:#205081}Note: This will not show information about old Zookeeper-based 
consumers.{color}_

_{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
HOST CLIENT-ID{color}_
 _{color:#205081}STREAM-TEST 0 0 0 0 
consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_

{color:#33}But the command to describe my akka stream consumer gives me 
empty output:{color}

*kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
myakkastreamkafka-1*
 {color:#205081}_Note: This will not show information about old Zookeeper-based 
consumers._{color}

{color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
HOST CLIENT-ID_{color}

 

{color:#33}That is strange. Can you please check the issue?{color}

  was:
I am running two consumers: akka-stream-kafka consumer with standard config 
section as described in the 
[https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
kafka-console-consumer.

akka-stream-kafka consumer configuration looks like this

_{color:#14892c}kafka-clients {{color}_
 _{color:#14892c}group.id = "myakkastreamkafka-1"{color}_
 _{color:#14892c}enable.auto.commit = false{color}_
_{color:#14892c}}{color}_


 I am able to see the both groups with the command

*kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
 {color:#205081}_Note: This will not show information about old Zookeeper-based 
consumers._{color}

{color:#205081}_myakkastreamkafka-1_{color}
 {color:#205081}_console-consumer-57171_{color}

{color:#33}I am able to view details about the console consumer group{color}

*kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
console-consumer-57171*
 _{color:#205081}Note: This will not show information about old Zookeeper-based 
consumers.{color}_

_{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
HOST CLIENT-ID{color}_
 _{color:#205081}STREAM-TEST 0 0 0 0 
consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_

{color:#33}But the command to describe my akka stream consumer gives me 
empty output:{color}

*kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
myakkastreamkafka-1*
 {color:#205081}_Note: This will not show information about old Zookeeper-based 
consumers._{color}

{color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
HOST CLIENT-ID_{color}

 

{color:#33}That is strange. Can you please check the issue?{color}


> kafka-consumer-group doesn't describe existing group
> 
>
> Key: KAFKA-7141
> URL: https://issues.apache.org/jira/browse/KAFKA-7141
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.11.0.0, 1.0.1
>Reporter: Bohdana Panchenko
>Priority: Major
>
> I am running two consumers: akka-stream-kafka consumer with standard config 
> section as described in the 
> [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
> kafka-console-consumer.
> akka-stream-kafka consumer configuration looks like this
> _{color:#14892c}kafka-clients {{color}_
>  _{color:#14892c}group.id = "myakkastreamkafka-1"{color}_
>  _{color:#14892c}enable.auto.commit = false{color}_
> {color:#14892c}}{color}
>  
>  I am able to see the both groups with the command
>  
>  *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
>  _Note: This will not show information about old Zookeeper-based consumers._
> {color:#205081}_myakkastreamkafka-1_{color}
>  {color:#205081}_console-consumer-57171_{color}
> {color:#33}I am able to view details about the console consumer 
> group{color}
> *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 

[jira] [Updated] (KAFKA-7141) kafka-consumer-group doesn't describe existing group

2018-07-09 Thread Bohdana Panchenko (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bohdana Panchenko updated KAFKA-7141:
-
Description: 
I am running two consumers: akka-stream-kafka consumer with standard config 
section as described in the 
[https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
kafka-console-consumer.

akka-stream-kafka consumer configuration looks like this

_{color:#14892c}kafka-clients {{color}_
 _{color:#14892c}group.id = "myakkastreamkafka-1"{color}_
 _{color:#14892c}enable.auto.commit = false{color}_
_{color:#14892c}}{color}_


 I am able to see the both groups with the command

*kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
 {color:#205081}_Note: This will not show information about old Zookeeper-based 
consumers._{color}

{color:#205081}_myakkastreamkafka-1_{color}
 {color:#205081}_console-consumer-57171_{color}

{color:#33}I am able to view details about the console consumer group{color}

*kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
console-consumer-57171*
 _{color:#205081}Note: This will not show information about old Zookeeper-based 
consumers.{color}_

_{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
HOST CLIENT-ID{color}_
 _{color:#205081}STREAM-TEST 0 0 0 0 
consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_

{color:#33}But the command to describe my akka stream consumer gives me 
empty output:{color}

*kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
myakkastreamkafka-1*
 {color:#205081}_Note: This will not show information about old Zookeeper-based 
consumers._{color}

{color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
HOST CLIENT-ID_{color}

 

{color:#33}That is strange. Can you please check the issue?{color}

  was:
I am running two consumers: akka-stream-kafka consumer with standard config 
section as described in the 
[https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
kafka-console-consumer.
I am able to see the both groups with the command

*kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
{color:#205081}_Note: This will not show information about old Zookeeper-based 
consumers._{color}

{color:#205081}_myakkastreamkafka-1_{color}
{color:#205081}_console-consumer-57171_
{color}

{color:#33}I am able to view details about the console consumer group{color}

*kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
console-consumer-57171*
_{color:#205081}Note: This will not show information about old Zookeeper-based 
consumers.{color}_


_{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
HOST CLIENT-ID{color}_
_{color:#205081}STREAM-TEST 0 0 0 0 
consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_

{color:#33}But the command to describe my akka stream consumer gives me 
empty output:{color}

*kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
myakkastreamkafka-1*
{color:#205081}_Note: This will not show information about old Zookeeper-based 
consumers._{color}


{color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
HOST CLIENT-ID_{color}

 

{color:#33}That is strange. Can you please check the issue?{color}


> kafka-consumer-group doesn't describe existing group
> 
>
> Key: KAFKA-7141
> URL: https://issues.apache.org/jira/browse/KAFKA-7141
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.11.0.0, 1.0.1
>Reporter: Bohdana Panchenko
>Priority: Major
>
> I am running two consumers: akka-stream-kafka consumer with standard config 
> section as described in the 
> [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
> kafka-console-consumer.
> akka-stream-kafka consumer configuration looks like this
> _{color:#14892c}kafka-clients {{color}_
>  _{color:#14892c}group.id = "myakkastreamkafka-1"{color}_
>  _{color:#14892c}enable.auto.commit = false{color}_
> _{color:#14892c}}{color}_
>  I am able to see the both groups with the command
> *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
>  {color:#205081}_Note: This will not show information about old 
> Zookeeper-based consumers._{color}
> {color:#205081}_myakkastreamkafka-1_{color}
>  {color:#205081}_console-consumer-57171_{color}
> {color:#33}I am able to view details about the console consumer 
> group{color}
> *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
> console-consumer-57171*
>  _{color:#205081}Note: This will not show information about old 
> Zookeeper-based consumers.{color}_
> _{color:#205081}TOPIC PARTITION CURRENT-OFFSET 

[jira] [Created] (KAFKA-7141) kafka-consumer-group doesn't describe existing group

2018-07-09 Thread Bohdana Panchenko (JIRA)
Bohdana Panchenko created KAFKA-7141:


 Summary: kafka-consumer-group doesn't describe existing group
 Key: KAFKA-7141
 URL: https://issues.apache.org/jira/browse/KAFKA-7141
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 1.0.1, 0.11.0.0
Reporter: Bohdana Panchenko


I am running two consumers: akka-stream-kafka consumer with standard config 
section as described in the 
[https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
kafka-console-consumer.
I am able to see the both groups with the command

*kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
{color:#205081}_Note: This will not show information about old Zookeeper-based 
consumers._{color}

{color:#205081}_myakkastreamkafka-1_{color}
{color:#205081}_console-consumer-57171_
{color}

{color:#33}I am able to view details about the console consumer group{color}

*kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
console-consumer-57171*
_{color:#205081}Note: This will not show information about old Zookeeper-based 
consumers.{color}_


_{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
HOST CLIENT-ID{color}_
_{color:#205081}STREAM-TEST 0 0 0 0 
consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_

{color:#33}But the command to describe my akka stream consumer gives me 
empty output:{color}

*kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
myakkastreamkafka-1*
{color:#205081}_Note: This will not show information about old Zookeeper-based 
consumers._{color}


{color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
HOST CLIENT-ID_{color}

 

{color:#33}That is strange. Can you please check the issue?{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7140) Remove deprecated poll usages

2018-07-09 Thread Viktor Somogyi (JIRA)
Viktor Somogyi created KAFKA-7140:
-

 Summary: Remove deprecated poll usages
 Key: KAFKA-7140
 URL: https://issues.apache.org/jira/browse/KAFKA-7140
 Project: Kafka
  Issue Type: Improvement
Reporter: Viktor Somogyi
Assignee: Viktor Somogyi


There are a couple of poll(long) usages of the consumer in test and non-test 
code. This jira would aim to remove the non-test usages of the method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7139) Support to exclude the internal topics in kafka-topics.sh command

2018-07-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536761#comment-16536761
 ] 

ASF GitHub Bot commented on KAFKA-7139:
---

chia7712 opened a new pull request #5349: KAFKA-7139 Support to exclude the 
internal topics in kafka-topics.sh …
URL: https://github.com/apache/kafka/pull/5349
 
 
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support to exclude the internal topics in kafka-topics.sh command
> -
>
> Key: KAFKA-7139
> URL: https://issues.apache.org/jira/browse/KAFKA-7139
> Project: Kafka
>  Issue Type: Task
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> Usually we have no interesting in internal topics. However, the 
> kafka-topics.sh can't exclude the internal topics simply. We have to write 
> the regular to exclude the internal topics. This issue tries to add a flag 
> (perhaps "exclude-internal") to kafka-topics.sh. If user set the flag, all 
> internal topics will be excluded by the following commands - "list", 
> "describe", "delete" and "alter"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7139) Support to exclude the internal topics in kafka-topics.sh command

2018-07-09 Thread Chia-Ping Tsai (JIRA)
Chia-Ping Tsai created KAFKA-7139:
-

 Summary: Support to exclude the internal topics in kafka-topics.sh 
command
 Key: KAFKA-7139
 URL: https://issues.apache.org/jira/browse/KAFKA-7139
 Project: Kafka
  Issue Type: Task
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


Usually we have no interesting in internal topics. However, the kafka-topics.sh 
can't exclude the internal topics simply. We have to write the regular to 
exclude the internal topics. This issue tries to add a flag (perhaps 
"exclude-internal") to kafka-topics.sh. If user set the flag, all internal 
topics will be excluded by the following commands - "list", "describe", 
"delete" and "alter"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4693) Consumer subscription change during rebalance causes exception

2018-07-09 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536693#comment-16536693
 ] 

Richard Yu commented on KAFKA-4693:
---

Hi [~hachikuji] Is the validation performed when the request future succeeds 
(i.e. something that occurs after {{ConsumerRebalanceListener}}'s method is 
called) ? Because when I first ran through the code, there does not appear to 
be any place where there looks to be such a validation. (I was looking in the 
proximity of {{ConsumerCoordinator#onJoinComplete()}})

> Consumer subscription change during rebalance causes exception
> --
>
> Key: KAFKA-4693
> URL: https://issues.apache.org/jira/browse/KAFKA-4693
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Jason Gustafson
>Priority: Minor
> Fix For: 2.1.0
>
>
> After every rebalance, the consumer validates that the assignment received 
> contains only partitions from topics that were subscribed. If not, then we 
> raise an exception to the user. It is possible for a wakeup or an interrupt 
> to leave the consumer with a rebalance in progress (e.g. with a JoinGroup to 
> the coordinator in-flight). If the user then changes the topic subscription, 
> then this validation upon completion of the rebalance will fail. We should 
> probably detect the subscription change, eat the exception, and request 
> another rebalance. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7132) Consider adding faster form of rebalancing

2018-07-09 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu reassigned KAFKA-7132:
-

Assignee: Richard Yu

> Consider adding faster form of rebalancing
> --
>
> Key: KAFKA-7132
> URL: https://issues.apache.org/jira/browse/KAFKA-7132
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Richard Yu
>Assignee: Richard Yu
>Priority: Critical
>  Labels: performance
>
> Currently, when a consumer falls out of a consumer group, it will restart 
> processing from the last checkpointed offset. However, this design could 
> result in a lag which some users could not afford to let happen. For example, 
> lets say a consumer crashed at offset 100, with the last checkpointed offset 
> being at 70. When it recovers at a later offset (say, 120), it will be behind 
> by an offset range of 50 (120 - 70). This is because the consumer restarted 
> at 70, forcing it to reprocess old data. To avoid this from happening, one 
> option would be to allow the current consumer to start processing not from 
> the last checkpointed offset (which is 70 in the example), but from 120 where 
> it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start 
> reading from offset 70 in concurrency with the old process, and will be 
> terminated once it reaches 120. In this manner, a considerable amount of lag 
> can be avoided, particularly since the old consumer could proceed as if 
> nothing had happened. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)