[jira] [Created] (KAFKA-7144) Kafka Streams doesn't properly balance partition assignment
James Cheng created KAFKA-7144: -- Summary: Kafka Streams doesn't properly balance partition assignment Key: KAFKA-7144 URL: https://issues.apache.org/jira/browse/KAFKA-7144 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.1.0 Reporter: James Cheng Attachments: OneThenTwelve.java Kafka Streams doesn't always spread the tasks across all available instances/threads I have a topology which consumes a single partition topic and goes .through() a 12 partition topic. The makes 13 partitions. I then started 2 instances of the application. I would have expected the 13 partitions to be split across the 2 instances roughly evenly (7 partitions on one, 6 partitions on the other). Instead, one instance gets 12 partitions, and the other instance gets 1 partition. Repro case attached. I ran it a couple times, and it was fairly repeatable. Setup for the repro: {code:java} $ ./bin/kafka-topics.sh --zookeeper localhost --create --topic one --partitions 1 --replication-factor 1 $ ./bin/kafka-topics.sh --zookeeper localhost --create --topic twelve --partitions 12 --replication-factor 1 $ echo foo | kafkacat -P -b 127.0.0.1 -t one {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7143) Cannot use KafkaConsumer with Kotlin coroutines due to Thread id check
Raman Gupta created KAFKA-7143: -- Summary: Cannot use KafkaConsumer with Kotlin coroutines due to Thread id check Key: KAFKA-7143 URL: https://issues.apache.org/jira/browse/KAFKA-7143 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 1.1.0 Reporter: Raman Gupta I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin [coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which supports a style of async programming that avoids the need for callbacks (and existing callback-based API's such as Kafka's can easily be adapted to this). With coroutines, methods with callbacks are suspended, and resumed once the call is complete. With this approach, while access to the KafkaConsumer is done in a thread-safe way, it does NOT happen from a single thread -- a different underlying thread may actually execute the code after the suspension point. However, the KafkaConsumer includes additional checks to verify not only the thread safety of the client, but that the *same thread* is being used -- if the same thread (by id) is not being used the consumer throws an exception like: {code} Exception in thread "ForkJoinPool.commonPool-worker-25" java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824) at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808) at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321) {code} I understand this check is present to protect people from themselves, but I'd like the ability to disable this check so that this code can be used effectively by libraries such as Kotlin coroutines. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6999) Document read-write lock usage of caching enabled stores
[ https://issues.apache.org/jira/browse/KAFKA-6999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537967#comment-16537967 ] ASF GitHub Bot commented on KAFKA-6999: --- dongjinleekr opened a new pull request #5351: KAFKA-6999: Document read-write lock usage of caching enabled stores URL: https://github.com/apache/kafka/pull/5351 Add description for the deadlock vulnerability of `ReadOnlyKeyValueStore`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Document read-write lock usage of caching enabled stores > > > Key: KAFKA-6999 > URL: https://issues.apache.org/jira/browse/KAFKA-6999 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > > From the mailing list > {quote}Hello again fellow Kafkans, > > Yesterday we observed a production deadlock take down one of our instances. > Upon digging, it's clear that our usage of Kafka is the proximate cause, but > the danger of our approach is not clear at all just from the Javadocs. > > We have stream processors that read off an incoming KStream, possibly > cross-reference some data from an auxiliary table via > ReadOnlyKeyValueStore.get() > > This is done via custom logic rather than a direct KTable join because which > index is consulted may change depending on the shape of incoming data. > > The ROKVS docs state, > * A key value store that only supports read operations. > * Implementations should be thread-safe as concurrent reads and writes > * are expected. > > They do **not** indicate that the CachingKVS layer uses a ReadWriteLock. If > you have one CachingKVS flush a record cause a read from another CKVS, you > are suddenly vulnerable to classic lock order reversals! Surprise! > > A partial stack trace highlighting the problem, with many uninteresting > frames removed, is inline at the bottom of this mail. > > You could probably rightly point to us allowing a "observer" pattern to > callback from within streams processing code is dangerous. We might move this > off to an auxiliary thread to alleviate this problem. > > But it still remains -- when you go an read that ROKVS documentation, it sure > doesn't prepare you to this possibility! > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (KAFKA-4693) Consumer subscription change during rebalance causes exception
[ https://issues.apache.org/jira/browse/KAFKA-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-4693: -- Comment: was deleted (was: Hi [~hachikuji] Is the validation performed when the request future succeeds (i.e. something that occurs after {{ConsumerRebalanceListener}}'s method is called) ? Because when I first ran through the code, there does not appear to be any place where there looks to be such a validation. (I was looking in the proximity of {{ConsumerCoordinator#onJoinComplete()}})) > Consumer subscription change during rebalance causes exception > -- > > Key: KAFKA-4693 > URL: https://issues.apache.org/jira/browse/KAFKA-4693 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Jason Gustafson >Priority: Minor > Fix For: 2.1.0 > > > After every rebalance, the consumer validates that the assignment received > contains only partitions from topics that were subscribed. If not, then we > raise an exception to the user. It is possible for a wakeup or an interrupt > to leave the consumer with a rebalance in progress (e.g. with a JoinGroup to > the coordinator in-flight). If the user then changes the topic subscription, > then this validation upon completion of the rebalance will fail. We should > probably detect the subscription change, eat the exception, and request > another rebalance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds
[ https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Zheng updated KAFKA-7142: -- Priority: Major (was: Minor) > Rebalancing large consumer group can block the coordinator broker for several > seconds > - > > Key: KAFKA-7142 > URL: https://issues.apache.org/jira/browse/KAFKA-7142 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.10.2.0, 0.11.0.0, 1.0.0, > 1.1.0 >Reporter: Ying Zheng >Assignee: Ying Zheng >Priority: Major > > In our production cluster, we noticed that when a large consumer group (a few > thousand members) is rebalancing, the produce latency of the coordinator > broker can jump to several seconds. > > Group rebalance is a very frequent operation, it can be triggered by adding / > removing / restarting a single member in the consumer group. > > When this happens, jstack shows all the request handler threads of the broker > are waiting for group lock: > {noformat} > "kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x7f9a32b16000 > nid=0x1b985 waiting on condition [0x7f98f1adb000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00024aa73b20> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) > at > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) > at > kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) > at kafka.server.KafkaApis.handle(KafkaApis.scala:115) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745){noformat} > > Besides one thread that is either doing GroupMetadata.supportsProtocols(): > {noformat} > "kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x7f9a32b14000 > nid=0x1b984 runnable [0x7f98f1bdc000] >java.lang.Thread.State: RUNNABLE > at scala.collection.immutable.List.map(List.scala:284) > at > kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68) > at > kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265) > at > kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265) > at scala.collection.immutable.List.map(List.scala:288) > at > kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265) > at > kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) > at > kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) > at kafka.server.KafkaApis.handle(KafkaApis.scala:115) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745){noformat} > or GroupCoordinator.tryCompleteJoin > {noformat} > "kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x7fe9f6ad1000 > nid=0x1ceff runnable [0x7fe8701ca000] >java.lang.Thread.State: RUNNABLE > at > scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248) > at > scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139) > at >
[jira] [Updated] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds
[ https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Zheng updated KAFKA-7142: -- Description: In our production cluster, we noticed that when a large consumer group (a few thousand members) is rebalancing, the produce latency of the coordinator broker can jump to several seconds. Group rebalance is a very frequent operation, it can be triggered by adding / removing / restarting a single member in the consumer group. When this happens, jstack shows all the request handler threads of the broker are waiting for group lock: {noformat} "kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x7f9a32b16000 nid=0x1b985 waiting on condition [0x7f98f1adb000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00024aa73b20> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248) at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) at kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) at kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) at kafka.server.KafkaApis.handle(KafkaApis.scala:115) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at java.lang.Thread.run(Thread.java:745){noformat} Besides one thread that is either doing GroupMetadata.supportsProtocols(): {noformat} "kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x7f9a32b14000 nid=0x1b984 runnable [0x7f98f1bdc000] java.lang.Thread.State: RUNNABLE at scala.collection.immutable.List.map(List.scala:284) at kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68) at kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265) at kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265) at scala.collection.immutable.List.map(List.scala:288) at kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265) at kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270) at kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) at kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) at kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) at kafka.server.KafkaApis.handle(KafkaApis.scala:115) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at java.lang.Thread.run(Thread.java:745){noformat} or GroupCoordinator.tryCompleteJoin {noformat} "kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x7fe9f6ad1000 nid=0x1ceff runnable [0x7fe8701ca000] java.lang.Thread.State: RUNNABLE at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248) at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139) at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:139) at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247) at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259) at scala.collection.AbstractTraversable.filter(Traversable.scala:104) at kafka.coordinator.group.GroupMetadata.notYetRejoinedMembers(GroupMetadata.scala:229) at
[jira] [Updated] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds
[ https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Zheng updated KAFKA-7142: -- Affects Version/s: (was: 0.10.2.0) 0.9.0.0 0.10.0.0 0.11.0.0 1.0.0 > Rebalancing large consumer group can block the coordinator broker for several > seconds > - > > Key: KAFKA-7142 > URL: https://issues.apache.org/jira/browse/KAFKA-7142 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.10.2.0, 0.11.0.0, 1.0.0, > 1.1.0 >Reporter: Ying Zheng >Assignee: Ying Zheng >Priority: Minor > > In our production cluster, we noticed that when a large consumer group (a few > thousand members) is rebalancing, the produce latency of the coordinator > broker can jump to several seconds. > > When this happens, jstack shows all the request handler threads of the broker > are waiting for group lock: > {noformat} > "kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x7f9a32b16000 > nid=0x1b985 waiting on condition [0x7f98f1adb000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00024aa73b20> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) > at > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) > at > kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) > at kafka.server.KafkaApis.handle(KafkaApis.scala:115) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745){noformat} > > Besides one thread that is either doing GroupMetadata.supportsProtocols(): > {noformat} > "kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x7f9a32b14000 > nid=0x1b984 runnable [0x7f98f1bdc000] >java.lang.Thread.State: RUNNABLE > at scala.collection.immutable.List.map(List.scala:284) > at > kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68) > at > kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265) > at > kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265) > at scala.collection.immutable.List.map(List.scala:288) > at > kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265) > at > kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) > at > kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) > at kafka.server.KafkaApis.handle(KafkaApis.scala:115) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745){noformat} > or GroupCoordinator.tryCompleteJoin > {noformat} > "kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x7fe9f6ad1000 > nid=0x1ceff runnable [0x7fe8701ca000] >java.lang.Thread.State: RUNNABLE > at > scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248) > at > scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139) > at > scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139)
[jira] [Updated] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds
[ https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Zheng updated KAFKA-7142: -- Affects Version/s: 0.10.1.0 0.10.2.0 > Rebalancing large consumer group can block the coordinator broker for several > seconds > - > > Key: KAFKA-7142 > URL: https://issues.apache.org/jira/browse/KAFKA-7142 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.10.2.0, 0.11.0.0, 1.0.0, > 1.1.0 >Reporter: Ying Zheng >Assignee: Ying Zheng >Priority: Minor > > In our production cluster, we noticed that when a large consumer group (a few > thousand members) is rebalancing, the produce latency of the coordinator > broker can jump to several seconds. > > When this happens, jstack shows all the request handler threads of the broker > are waiting for group lock: > {noformat} > "kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x7f9a32b16000 > nid=0x1b985 waiting on condition [0x7f98f1adb000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00024aa73b20> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) > at > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) > at > kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) > at kafka.server.KafkaApis.handle(KafkaApis.scala:115) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745){noformat} > > Besides one thread that is either doing GroupMetadata.supportsProtocols(): > {noformat} > "kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x7f9a32b14000 > nid=0x1b984 runnable [0x7f98f1bdc000] >java.lang.Thread.State: RUNNABLE > at scala.collection.immutable.List.map(List.scala:284) > at > kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68) > at > kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265) > at > kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265) > at scala.collection.immutable.List.map(List.scala:288) > at > kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265) > at > kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) > at > kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) > at kafka.server.KafkaApis.handle(KafkaApis.scala:115) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745){noformat} > or GroupCoordinator.tryCompleteJoin > {noformat} > "kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x7fe9f6ad1000 > nid=0x1ceff runnable [0x7fe8701ca000] >java.lang.Thread.State: RUNNABLE > at > scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248) > at > scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139) > at > scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > at
[jira] [Updated] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds
[ https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Zheng updated KAFKA-7142: -- Affects Version/s: 1.1.0 > Rebalancing large consumer group can block the coordinator broker for several > seconds > - > > Key: KAFKA-7142 > URL: https://issues.apache.org/jira/browse/KAFKA-7142 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.10.2.0, 1.1.0 >Reporter: Ying Zheng >Assignee: Ying Zheng >Priority: Minor > > In our production cluster, we noticed that when a large consumer group (a few > thousand members) is rebalancing, the produce latency of the coordinator > broker can jump to several seconds. > > When this happens, jstack shows all the request handler threads of the broker > are waiting for group lock: > {noformat} > "kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x7f9a32b16000 > nid=0x1b985 waiting on condition [0x7f98f1adb000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00024aa73b20> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) > at > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) > at > kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) > at kafka.server.KafkaApis.handle(KafkaApis.scala:115) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745){noformat} > > Besides one thread that is either doing GroupMetadata.supportsProtocols(): > {noformat} > "kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x7f9a32b14000 > nid=0x1b984 runnable [0x7f98f1bdc000] >java.lang.Thread.State: RUNNABLE > at scala.collection.immutable.List.map(List.scala:284) > at > kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68) > at > kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265) > at > kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265) > at scala.collection.immutable.List.map(List.scala:288) > at > kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265) > at > kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) > at > kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) > at kafka.server.KafkaApis.handle(KafkaApis.scala:115) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745){noformat} > or GroupCoordinator.tryCompleteJoin > {noformat} > "kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x7fe9f6ad1000 > nid=0x1ceff runnable [0x7fe8701ca000] >java.lang.Thread.State: RUNNABLE > at > scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248) > at > scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139) > at > scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > at
[jira] [Updated] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds
[ https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Zheng updated KAFKA-7142: -- Affects Version/s: 0.10.2.0 > Rebalancing large consumer group can block the coordinator broker for several > seconds > - > > Key: KAFKA-7142 > URL: https://issues.apache.org/jira/browse/KAFKA-7142 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.10.2.0 >Reporter: Ying Zheng >Assignee: Ying Zheng >Priority: Minor > > In our production cluster, we noticed that when a large consumer group (a few > thousand members) is rebalancing, the produce latency of the coordinator > broker can jump to several seconds. > > When this happens, jstack shows all the request handler threads of the broker > are waiting for group lock: > {noformat} > "kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x7f9a32b16000 > nid=0x1b985 waiting on condition [0x7f98f1adb000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00024aa73b20> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) > at > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) > at > kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) > at kafka.server.KafkaApis.handle(KafkaApis.scala:115) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745){noformat} > > Besides one thread that is either doing GroupMetadata.supportsProtocols(): > {noformat} > "kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x7f9a32b14000 > nid=0x1b984 runnable [0x7f98f1bdc000] >java.lang.Thread.State: RUNNABLE > at scala.collection.immutable.List.map(List.scala:284) > at > kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68) > at > kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265) > at > kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265) > at scala.collection.immutable.List.map(List.scala:288) > at > kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265) > at > kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) > at > kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) > at kafka.server.KafkaApis.handle(KafkaApis.scala:115) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745){noformat} > or GroupCoordinator.tryCompleteJoin > {noformat} > "kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x7fe9f6ad1000 > nid=0x1ceff runnable [0x7fe8701ca000] >java.lang.Thread.State: RUNNABLE > at > scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248) > at > scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139) > at > scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > at
[jira] [Assigned] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds
[ https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Zheng reassigned KAFKA-7142: - Assignee: Ying Zheng > Rebalancing large consumer group can block the coordinator broker for several > seconds > - > > Key: KAFKA-7142 > URL: https://issues.apache.org/jira/browse/KAFKA-7142 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.10.2.0 >Reporter: Ying Zheng >Assignee: Ying Zheng >Priority: Minor > > In our production cluster, we noticed that when a large consumer group (a few > thousand members) is rebalancing, the produce latency of the coordinator > broker can jump to several seconds. > > When this happens, jstack shows all the request handler threads of the broker > are waiting for group lock: > {noformat} > "kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x7f9a32b16000 > nid=0x1b985 waiting on condition [0x7f98f1adb000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00024aa73b20> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) > at > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) > at > kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) > at kafka.server.KafkaApis.handle(KafkaApis.scala:115) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745){noformat} > > Besides one thread that is either doing GroupMetadata.supportsProtocols(): > {noformat} > "kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x7f9a32b14000 > nid=0x1b984 runnable [0x7f98f1bdc000] >java.lang.Thread.State: RUNNABLE > at scala.collection.immutable.List.map(List.scala:284) > at > kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68) > at > kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265) > at > kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265) > at scala.collection.immutable.List.map(List.scala:288) > at > kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265) > at > kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) > at > kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) > at kafka.server.KafkaApis.handle(KafkaApis.scala:115) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745){noformat} > or GroupCoordinator.tryCompleteJoin > {noformat} > "kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x7fe9f6ad1000 > nid=0x1ceff runnable [0x7fe8701ca000] >java.lang.Thread.State: RUNNABLE > at > scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248) > at > scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139) > at > scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > at
[jira] [Updated] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds
[ https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Zheng updated KAFKA-7142: -- Description: In our production cluster, we noticed that when a large consumer group (a few thousand members) is rebalancing, the produce latency of the coordinator broker can jump to several seconds. When this happens, jstack shows all the request handler threads of the broker are waiting for group lock: {noformat} "kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x7f9a32b16000 nid=0x1b985 waiting on condition [0x7f98f1adb000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00024aa73b20> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248) at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) at kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) at kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) at kafka.server.KafkaApis.handle(KafkaApis.scala:115) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at java.lang.Thread.run(Thread.java:745){noformat} Besides one thread that is either doing GroupMetadata.supportsProtocols(): {noformat} "kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x7f9a32b14000 nid=0x1b984 runnable [0x7f98f1bdc000] java.lang.Thread.State: RUNNABLE at scala.collection.immutable.List.map(List.scala:284) at kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68) at kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265) at kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265) at scala.collection.immutable.List.map(List.scala:288) at kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265) at kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270) at kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) at kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) at kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) at kafka.server.KafkaApis.handle(KafkaApis.scala:115) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at java.lang.Thread.run(Thread.java:745){noformat} or GroupCoordinator.tryCompleteJoin {noformat} "kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x7fe9f6ad1000 nid=0x1ceff runnable [0x7fe8701ca000] java.lang.Thread.State: RUNNABLE at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248) at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139) at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:139) at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247) at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259) at scala.collection.AbstractTraversable.filter(Traversable.scala:104) at kafka.coordinator.group.GroupMetadata.notYetRejoinedMembers(GroupMetadata.scala:229) at kafka.coordinator.group.GroupCoordinator$$anonfun$tryCompleteJoin$1.apply$mcZ$sp(GroupCoordinator.scala:767) at kafka.coordinator.group.GroupCoordinator$$anonfun$tryCompleteJoin$1.apply(GroupCoordinator.scala:767) at
[jira] [Updated] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds
[ https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Zheng updated KAFKA-7142: -- Priority: Minor (was: Major) > Rebalancing large consumer group can block the coordinator broker for several > seconds > - > > Key: KAFKA-7142 > URL: https://issues.apache.org/jira/browse/KAFKA-7142 > Project: Kafka > Issue Type: Improvement >Reporter: Ying Zheng >Priority: Minor > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds
Ying Zheng created KAFKA-7142: - Summary: Rebalancing large consumer group can block the coordinator broker for several seconds Key: KAFKA-7142 URL: https://issues.apache.org/jira/browse/KAFKA-7142 Project: Kafka Issue Type: Improvement Reporter: Ying Zheng -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7141) kafka-consumer-group doesn't describe existing group
[ https://issues.apache.org/jira/browse/KAFKA-7141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537665#comment-16537665 ] Vahid Hashemian commented on KAFKA-7141: [~kioria] thanks for reporting the issue. Do you see that output after manually committing offsets? Or before? It would be great help if you could provide the steps that reproduces the issue. Thanks! > kafka-consumer-group doesn't describe existing group > > > Key: KAFKA-7141 > URL: https://issues.apache.org/jira/browse/KAFKA-7141 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 0.11.0.0, 1.0.1 >Reporter: Bohdana Panchenko >Priority: Major > > I am running two consumers: akka-stream-kafka consumer with standard config > section as described in the > [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and > kafka-console-consumer. > akka-stream-kafka consumer configuration looks like this > {color:#33}_akka.kafka.consumer{_{color} > {color:#33} _kafka-clients{_{color} > {color:#33} _group.id = "myakkastreamkafka-1"_{color} > {color:#33} _enable.auto.commit = false_{color} > } > {color:#33} }{color} > > I am able to see the both groups with the command > > *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list* > _Note: This will not show information about old Zookeeper-based consumers._ > > _myakkastreamkafka-1_ > _console-consumer-57171_ > {color:#33}I am able to view details about the console consumer > group{color} > *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group > console-consumer-57171* > _{color:#205081}Note: This will not show information about old > Zookeeper-based consumers.{color}_ > _{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID > HOST CLIENT-ID{color}_ > _{color:#205081}STREAM-TEST 0 0 0 0 > consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_ > {color:#33}But the command to describe my akka stream consumer gives me > empty output:{color} > *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group > myakkastreamkafka-1* > {color:#205081}_Note: This will not show information about old > Zookeeper-based consumers._{color} > {color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID > HOST CLIENT-ID_{color} > > {color:#33}That is strange. Can you please check the issue?{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4740) Using new consumer API with a Deserializer that throws SerializationException can lead to infinite loop
[ https://issues.apache.org/jira/browse/KAFKA-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537274#comment-16537274 ] Raman Gupta commented on KAFKA-4740: I'm still a little confused about how this should work though: the position requires a partition as an argument, but how can I get that information? I only see that the consumer knows the *set* of partitions assigned to it, not the partition that contains the message which failed to deserialize. > Using new consumer API with a Deserializer that throws SerializationException > can lead to infinite loop > --- > > Key: KAFKA-4740 > URL: https://issues.apache.org/jira/browse/KAFKA-4740 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1 > Environment: Kafka broker 0.10.1.1 (but this bug is not dependent on > the broker version) > Kafka clients 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1 >Reporter: Sébastien Launay >Assignee: Sébastien Launay >Priority: Critical > > The old consumer supports deserializing records into typed objects and throws > a {{SerializationException}} through {{MessageAndMetadata#key()}} and > {{MessageAndMetadata#message()}} that can be catched by the client \[1\]. > When using the new consumer API with kafka-clients version < 0.10.0.1, such > the exception is swallowed by the {{NetworkClient}} class and result in an > infinite loop which the client has no control over like: > {noformat} > DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset > for partition test2-0 to earliest offset. > DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetched offset 0 > for partition test2-0 > ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request > completion: > org.apache.kafka.common.errors.SerializationException: Size of data received > by IntegerDeserializer is not 4 > ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request > completion: > org.apache.kafka.common.errors.SerializationException: Size of data received > by IntegerDeserializer is not 4 > ... > {noformat} > Thanks to KAFKA-3977, this has been partially fixed in 0.10.1.0 but another > issue still remains. > Indeed, the client can now catch the {{SerializationException}} but the next > call to {{Consumer#poll(long)}} will throw the same exception indefinitely. > The following snippet (full example available on Github \[2\] for most > released kafka-clients versions): > {code:java} > try (KafkaConsumer kafkaConsumer = new > KafkaConsumer<>(consumerConfig, new StringDeserializer(), new > IntegerDeserializer())) { > kafkaConsumer.subscribe(Arrays.asList("topic")); > // Will run till the shutdown hook is called > while (!doStop) { > try { > ConsumerRecords records = > kafkaConsumer.poll(1000); > if (!records.isEmpty()) { > logger.info("Got {} messages", records.count()); > for (ConsumerRecord record : records) { > logger.info("Message with partition: {}, offset: {}, key: > {}, value: {}", > record.partition(), record.offset(), record.key(), > record.value()); > } > } else { > logger.info("No messages to consume"); > } > } catch (SerializationException e) { > logger.warn("Failed polling some records", e); > } > } > } > {code} > when run with the following records (third record has an invalid Integer > value): > {noformat} > printf "\x00\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list > localhost:9092 --topic topic > printf "\x00\x00\x00\x01\n" | bin/kafka-console-producer.sh --broker-list > localhost:9092 --topic topic > printf "\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list > localhost:9092 --topic topic > printf "\x00\x00\x00\x02\n" | bin/kafka-console-producer.sh --broker-list > localhost:9092 --topic topic > {noformat} > will produce the following logs: > {noformat} > INFO consumer.Consumer - Got 2 messages > INFO consumer.Consumer - Message with partition: 0, offset: 0, key: null, > value: 0 > INFO consumer.Consumer - Message with partition: 0, offset: 1, key: null, > value: 1 > WARN consumer.Consumer - Failed polling some records > org.apache.kafka.common.errors.SerializationException: Error deserializing > key/value for partition topic-0 at offset 2 > Caused by: org.apache.kafka.common.errors.SerializationException: Size of > data received by IntegerDeserializer is not 4 > WARN consumer.Consumer - Failed polling some records >
[jira] [Commented] (KAFKA-4740) Using new consumer API with a Deserializer that throws SerializationException can lead to infinite loop
[ https://issues.apache.org/jira/browse/KAFKA-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537264#comment-16537264 ] Raman Gupta commented on KAFKA-4740: Reading through the other linked issues and github pull requests, it *seems* like the intended behavior now, if client code wishes to seek past a bad message proactively, is for the client code to use the `position` and `seek` methods on the underlying KafkaConsumer when this exception occurs. While the exception itself does not contain the offset information, the position of the consumer should be correct (see [this comment|https://issues.apache.org/jira/browse/KAFKA-5211?focusedCommentId=16024066=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16024066] from [~hachikuji]). > Using new consumer API with a Deserializer that throws SerializationException > can lead to infinite loop > --- > > Key: KAFKA-4740 > URL: https://issues.apache.org/jira/browse/KAFKA-4740 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1 > Environment: Kafka broker 0.10.1.1 (but this bug is not dependent on > the broker version) > Kafka clients 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1 >Reporter: Sébastien Launay >Assignee: Sébastien Launay >Priority: Critical > > The old consumer supports deserializing records into typed objects and throws > a {{SerializationException}} through {{MessageAndMetadata#key()}} and > {{MessageAndMetadata#message()}} that can be catched by the client \[1\]. > When using the new consumer API with kafka-clients version < 0.10.0.1, such > the exception is swallowed by the {{NetworkClient}} class and result in an > infinite loop which the client has no control over like: > {noformat} > DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset > for partition test2-0 to earliest offset. > DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetched offset 0 > for partition test2-0 > ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request > completion: > org.apache.kafka.common.errors.SerializationException: Size of data received > by IntegerDeserializer is not 4 > ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request > completion: > org.apache.kafka.common.errors.SerializationException: Size of data received > by IntegerDeserializer is not 4 > ... > {noformat} > Thanks to KAFKA-3977, this has been partially fixed in 0.10.1.0 but another > issue still remains. > Indeed, the client can now catch the {{SerializationException}} but the next > call to {{Consumer#poll(long)}} will throw the same exception indefinitely. > The following snippet (full example available on Github \[2\] for most > released kafka-clients versions): > {code:java} > try (KafkaConsumer kafkaConsumer = new > KafkaConsumer<>(consumerConfig, new StringDeserializer(), new > IntegerDeserializer())) { > kafkaConsumer.subscribe(Arrays.asList("topic")); > // Will run till the shutdown hook is called > while (!doStop) { > try { > ConsumerRecords records = > kafkaConsumer.poll(1000); > if (!records.isEmpty()) { > logger.info("Got {} messages", records.count()); > for (ConsumerRecord record : records) { > logger.info("Message with partition: {}, offset: {}, key: > {}, value: {}", > record.partition(), record.offset(), record.key(), > record.value()); > } > } else { > logger.info("No messages to consume"); > } > } catch (SerializationException e) { > logger.warn("Failed polling some records", e); > } > } > } > {code} > when run with the following records (third record has an invalid Integer > value): > {noformat} > printf "\x00\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list > localhost:9092 --topic topic > printf "\x00\x00\x00\x01\n" | bin/kafka-console-producer.sh --broker-list > localhost:9092 --topic topic > printf "\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list > localhost:9092 --topic topic > printf "\x00\x00\x00\x02\n" | bin/kafka-console-producer.sh --broker-list > localhost:9092 --topic topic > {noformat} > will produce the following logs: > {noformat} > INFO consumer.Consumer - Got 2 messages > INFO consumer.Consumer - Message with partition: 0, offset: 0, key: null, > value: 0 > INFO consumer.Consumer - Message with partition: 0, offset: 1, key: null, > value: 1 > WARN consumer.Consumer - Failed polling some records >
[jira] [Updated] (KAFKA-7141) kafka-consumer-group doesn't describe existing group
[ https://issues.apache.org/jira/browse/KAFKA-7141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bohdana Panchenko updated KAFKA-7141: - Description: I am running two consumers: akka-stream-kafka consumer with standard config section as described in the [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and kafka-console-consumer. akka-stream-kafka consumer configuration looks like this {color:#33}_akka.kafka.consumer{_{color} {color:#33} _kafka-clients{_{color} {color:#33} _group.id = "myakkastreamkafka-1"_{color} {color:#33} _enable.auto.commit = false_{color} } {color:#33} }{color} I am able to see the both groups with the command *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list* _Note: This will not show information about old Zookeeper-based consumers._ _myakkastreamkafka-1_ _console-consumer-57171_ {color:#33}I am able to view details about the console consumer group{color} *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group console-consumer-57171* _{color:#205081}Note: This will not show information about old Zookeeper-based consumers.{color}_ _{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID{color}_ _{color:#205081}STREAM-TEST 0 0 0 0 consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_ {color:#33}But the command to describe my akka stream consumer gives me empty output:{color} *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group myakkastreamkafka-1* {color:#205081}_Note: This will not show information about old Zookeeper-based consumers._{color} {color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID_{color} {color:#33}That is strange. Can you please check the issue?{color} was: I am running two consumers: akka-stream-kafka consumer with standard config section as described in the [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and kafka-console-consumer. akka-stream-kafka consumer configuration looks like this _{color:#14892c}akka.kafka.consumer{{color}_ _{color:#14892c}kafka-clients{{color}_ _{color:#14892c}group.id = "myakkastreamkafka-1"{color}_ _{color:#14892c}enable.auto.commit = false{color}_ _{color:#14892c}}{color}_ } I am able to see the both groups with the command *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list* _Note: This will not show information about old Zookeeper-based consumers._ _myakkastreamkafka-1_ {color:#205081}_console-consumer-57171_{color} {color:#33}I am able to view details about the console consumer group{color} *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group console-consumer-57171* _{color:#205081}Note: This will not show information about old Zookeeper-based consumers.{color}_ _{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID{color}_ _{color:#205081}STREAM-TEST 0 0 0 0 consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_ {color:#33}But the command to describe my akka stream consumer gives me empty output:{color} *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group myakkastreamkafka-1* {color:#205081}_Note: This will not show information about old Zookeeper-based consumers._{color} {color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID_{color} {color:#33}That is strange. Can you please check the issue?{color} > kafka-consumer-group doesn't describe existing group > > > Key: KAFKA-7141 > URL: https://issues.apache.org/jira/browse/KAFKA-7141 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 0.11.0.0, 1.0.1 >Reporter: Bohdana Panchenko >Priority: Major > > I am running two consumers: akka-stream-kafka consumer with standard config > section as described in the > [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and > kafka-console-consumer. > akka-stream-kafka consumer configuration looks like this > {color:#33}_akka.kafka.consumer{_{color} > {color:#33} _kafka-clients{_{color} > {color:#33} _group.id = "myakkastreamkafka-1"_{color} > {color:#33} _enable.auto.commit = false_{color} > } > {color:#33} }{color} > > I am able to see the both groups with the command > > *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list* > _Note: This will not show information about old Zookeeper-based consumers._ > > _myakkastreamkafka-1_ > _console-consumer-57171_ > {color:#33}I am able to view details about the console consumer > group{color} >
[jira] [Updated] (KAFKA-7141) kafka-consumer-group doesn't describe existing group
[ https://issues.apache.org/jira/browse/KAFKA-7141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bohdana Panchenko updated KAFKA-7141: - Description: I am running two consumers: akka-stream-kafka consumer with standard config section as described in the [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and kafka-console-consumer. akka-stream-kafka consumer configuration looks like this _{color:#14892c}akka.kafka.consumer{{color}_ _{color:#14892c}kafka-clients{{color}_ _{color:#14892c}group.id = "myakkastreamkafka-1"{color}_ _{color:#14892c}enable.auto.commit = false{color}_ _{color:#14892c}}{color}_ } I am able to see the both groups with the command *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list* _Note: This will not show information about old Zookeeper-based consumers._ _myakkastreamkafka-1_ {color:#205081}_console-consumer-57171_{color} {color:#33}I am able to view details about the console consumer group{color} *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group console-consumer-57171* _{color:#205081}Note: This will not show information about old Zookeeper-based consumers.{color}_ _{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID{color}_ _{color:#205081}STREAM-TEST 0 0 0 0 consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_ {color:#33}But the command to describe my akka stream consumer gives me empty output:{color} *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group myakkastreamkafka-1* {color:#205081}_Note: This will not show information about old Zookeeper-based consumers._{color} {color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID_{color} {color:#33}That is strange. Can you please check the issue?{color} was: I am running two consumers: akka-stream-kafka consumer with standard config section as described in the [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and kafka-console-consumer. akka-stream-kafka consumer configuration looks like this _{color:#14892c}kafka-clients {{color}_ _{color:#14892c}group.id = "myakkastreamkafka-1"{color}_ _{color:#14892c}enable.auto.commit = false{color}_ {color:#14892c}}{color} I am able to see the both groups with the command *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list* _Note: This will not show information about old Zookeeper-based consumers._ {color:#205081}_myakkastreamkafka-1_{color} {color:#205081}_console-consumer-57171_{color} {color:#33}I am able to view details about the console consumer group{color} *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group console-consumer-57171* _{color:#205081}Note: This will not show information about old Zookeeper-based consumers.{color}_ _{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID{color}_ _{color:#205081}STREAM-TEST 0 0 0 0 consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_ {color:#33}But the command to describe my akka stream consumer gives me empty output:{color} *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group myakkastreamkafka-1* {color:#205081}_Note: This will not show information about old Zookeeper-based consumers._{color} {color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID_{color} {color:#33}That is strange. Can you please check the issue?{color} > kafka-consumer-group doesn't describe existing group > > > Key: KAFKA-7141 > URL: https://issues.apache.org/jira/browse/KAFKA-7141 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 0.11.0.0, 1.0.1 >Reporter: Bohdana Panchenko >Priority: Major > > I am running two consumers: akka-stream-kafka consumer with standard config > section as described in the > [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and > kafka-console-consumer. > akka-stream-kafka consumer configuration looks like this > _{color:#14892c}akka.kafka.consumer{{color}_ > _{color:#14892c}kafka-clients{{color}_ > _{color:#14892c}group.id = "myakkastreamkafka-1"{color}_ > _{color:#14892c}enable.auto.commit = false{color}_ > _{color:#14892c}}{color}_ > } > I am able to see the both groups with the command > > *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list* > _Note: This will not show information about old Zookeeper-based consumers._ > > _myakkastreamkafka-1_ > {color:#205081}_console-consumer-57171_{color} > {color:#33}I am able to view details about the console consumer > group{color} >
[jira] [Updated] (KAFKA-7141) kafka-consumer-group doesn't describe existing group
[ https://issues.apache.org/jira/browse/KAFKA-7141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bohdana Panchenko updated KAFKA-7141: - Description: I am running two consumers: akka-stream-kafka consumer with standard config section as described in the [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and kafka-console-consumer. akka-stream-kafka consumer configuration looks like this _{color:#14892c}kafka-clients {{color}_ _{color:#14892c}group.id = "myakkastreamkafka-1"{color}_ _{color:#14892c}enable.auto.commit = false{color}_ {color:#14892c}}{color} I am able to see the both groups with the command *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list* _Note: This will not show information about old Zookeeper-based consumers._ {color:#205081}_myakkastreamkafka-1_{color} {color:#205081}_console-consumer-57171_{color} {color:#33}I am able to view details about the console consumer group{color} *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group console-consumer-57171* _{color:#205081}Note: This will not show information about old Zookeeper-based consumers.{color}_ _{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID{color}_ _{color:#205081}STREAM-TEST 0 0 0 0 consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_ {color:#33}But the command to describe my akka stream consumer gives me empty output:{color} *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group myakkastreamkafka-1* {color:#205081}_Note: This will not show information about old Zookeeper-based consumers._{color} {color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID_{color} {color:#33}That is strange. Can you please check the issue?{color} was: I am running two consumers: akka-stream-kafka consumer with standard config section as described in the [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and kafka-console-consumer. akka-stream-kafka consumer configuration looks like this _{color:#14892c}kafka-clients {{color}_ _{color:#14892c}group.id = "myakkastreamkafka-1"{color}_ _{color:#14892c}enable.auto.commit = false{color}_ _{color:#14892c}}{color}_ I am able to see the both groups with the command *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list* {color:#205081}_Note: This will not show information about old Zookeeper-based consumers._{color} {color:#205081}_myakkastreamkafka-1_{color} {color:#205081}_console-consumer-57171_{color} {color:#33}I am able to view details about the console consumer group{color} *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group console-consumer-57171* _{color:#205081}Note: This will not show information about old Zookeeper-based consumers.{color}_ _{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID{color}_ _{color:#205081}STREAM-TEST 0 0 0 0 consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_ {color:#33}But the command to describe my akka stream consumer gives me empty output:{color} *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group myakkastreamkafka-1* {color:#205081}_Note: This will not show information about old Zookeeper-based consumers._{color} {color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID_{color} {color:#33}That is strange. Can you please check the issue?{color} > kafka-consumer-group doesn't describe existing group > > > Key: KAFKA-7141 > URL: https://issues.apache.org/jira/browse/KAFKA-7141 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 0.11.0.0, 1.0.1 >Reporter: Bohdana Panchenko >Priority: Major > > I am running two consumers: akka-stream-kafka consumer with standard config > section as described in the > [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and > kafka-console-consumer. > akka-stream-kafka consumer configuration looks like this > _{color:#14892c}kafka-clients {{color}_ > _{color:#14892c}group.id = "myakkastreamkafka-1"{color}_ > _{color:#14892c}enable.auto.commit = false{color}_ > {color:#14892c}}{color} > > I am able to see the both groups with the command > > *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list* > _Note: This will not show information about old Zookeeper-based consumers._ > {color:#205081}_myakkastreamkafka-1_{color} > {color:#205081}_console-consumer-57171_{color} > {color:#33}I am able to view details about the console consumer > group{color} > *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092
[jira] [Updated] (KAFKA-7141) kafka-consumer-group doesn't describe existing group
[ https://issues.apache.org/jira/browse/KAFKA-7141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bohdana Panchenko updated KAFKA-7141: - Description: I am running two consumers: akka-stream-kafka consumer with standard config section as described in the [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and kafka-console-consumer. akka-stream-kafka consumer configuration looks like this _{color:#14892c}kafka-clients {{color}_ _{color:#14892c}group.id = "myakkastreamkafka-1"{color}_ _{color:#14892c}enable.auto.commit = false{color}_ _{color:#14892c}}{color}_ I am able to see the both groups with the command *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list* {color:#205081}_Note: This will not show information about old Zookeeper-based consumers._{color} {color:#205081}_myakkastreamkafka-1_{color} {color:#205081}_console-consumer-57171_{color} {color:#33}I am able to view details about the console consumer group{color} *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group console-consumer-57171* _{color:#205081}Note: This will not show information about old Zookeeper-based consumers.{color}_ _{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID{color}_ _{color:#205081}STREAM-TEST 0 0 0 0 consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_ {color:#33}But the command to describe my akka stream consumer gives me empty output:{color} *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group myakkastreamkafka-1* {color:#205081}_Note: This will not show information about old Zookeeper-based consumers._{color} {color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID_{color} {color:#33}That is strange. Can you please check the issue?{color} was: I am running two consumers: akka-stream-kafka consumer with standard config section as described in the [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and kafka-console-consumer. I am able to see the both groups with the command *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list* {color:#205081}_Note: This will not show information about old Zookeeper-based consumers._{color} {color:#205081}_myakkastreamkafka-1_{color} {color:#205081}_console-consumer-57171_ {color} {color:#33}I am able to view details about the console consumer group{color} *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group console-consumer-57171* _{color:#205081}Note: This will not show information about old Zookeeper-based consumers.{color}_ _{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID{color}_ _{color:#205081}STREAM-TEST 0 0 0 0 consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_ {color:#33}But the command to describe my akka stream consumer gives me empty output:{color} *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group myakkastreamkafka-1* {color:#205081}_Note: This will not show information about old Zookeeper-based consumers._{color} {color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID_{color} {color:#33}That is strange. Can you please check the issue?{color} > kafka-consumer-group doesn't describe existing group > > > Key: KAFKA-7141 > URL: https://issues.apache.org/jira/browse/KAFKA-7141 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 0.11.0.0, 1.0.1 >Reporter: Bohdana Panchenko >Priority: Major > > I am running two consumers: akka-stream-kafka consumer with standard config > section as described in the > [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and > kafka-console-consumer. > akka-stream-kafka consumer configuration looks like this > _{color:#14892c}kafka-clients {{color}_ > _{color:#14892c}group.id = "myakkastreamkafka-1"{color}_ > _{color:#14892c}enable.auto.commit = false{color}_ > _{color:#14892c}}{color}_ > I am able to see the both groups with the command > *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list* > {color:#205081}_Note: This will not show information about old > Zookeeper-based consumers._{color} > {color:#205081}_myakkastreamkafka-1_{color} > {color:#205081}_console-consumer-57171_{color} > {color:#33}I am able to view details about the console consumer > group{color} > *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group > console-consumer-57171* > _{color:#205081}Note: This will not show information about old > Zookeeper-based consumers.{color}_ > _{color:#205081}TOPIC PARTITION CURRENT-OFFSET
[jira] [Created] (KAFKA-7141) kafka-consumer-group doesn't describe existing group
Bohdana Panchenko created KAFKA-7141: Summary: kafka-consumer-group doesn't describe existing group Key: KAFKA-7141 URL: https://issues.apache.org/jira/browse/KAFKA-7141 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 1.0.1, 0.11.0.0 Reporter: Bohdana Panchenko I am running two consumers: akka-stream-kafka consumer with standard config section as described in the [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and kafka-console-consumer. I am able to see the both groups with the command *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list* {color:#205081}_Note: This will not show information about old Zookeeper-based consumers._{color} {color:#205081}_myakkastreamkafka-1_{color} {color:#205081}_console-consumer-57171_ {color} {color:#33}I am able to view details about the console consumer group{color} *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group console-consumer-57171* _{color:#205081}Note: This will not show information about old Zookeeper-based consumers.{color}_ _{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID{color}_ _{color:#205081}STREAM-TEST 0 0 0 0 consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_ {color:#33}But the command to describe my akka stream consumer gives me empty output:{color} *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group myakkastreamkafka-1* {color:#205081}_Note: This will not show information about old Zookeeper-based consumers._{color} {color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID_{color} {color:#33}That is strange. Can you please check the issue?{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7140) Remove deprecated poll usages
Viktor Somogyi created KAFKA-7140: - Summary: Remove deprecated poll usages Key: KAFKA-7140 URL: https://issues.apache.org/jira/browse/KAFKA-7140 Project: Kafka Issue Type: Improvement Reporter: Viktor Somogyi Assignee: Viktor Somogyi There are a couple of poll(long) usages of the consumer in test and non-test code. This jira would aim to remove the non-test usages of the method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7139) Support to exclude the internal topics in kafka-topics.sh command
[ https://issues.apache.org/jira/browse/KAFKA-7139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536761#comment-16536761 ] ASF GitHub Bot commented on KAFKA-7139: --- chia7712 opened a new pull request #5349: KAFKA-7139 Support to exclude the internal topics in kafka-topics.sh … URL: https://github.com/apache/kafka/pull/5349 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support to exclude the internal topics in kafka-topics.sh command > - > > Key: KAFKA-7139 > URL: https://issues.apache.org/jira/browse/KAFKA-7139 > Project: Kafka > Issue Type: Task >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > Usually we have no interesting in internal topics. However, the > kafka-topics.sh can't exclude the internal topics simply. We have to write > the regular to exclude the internal topics. This issue tries to add a flag > (perhaps "exclude-internal") to kafka-topics.sh. If user set the flag, all > internal topics will be excluded by the following commands - "list", > "describe", "delete" and "alter" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7139) Support to exclude the internal topics in kafka-topics.sh command
Chia-Ping Tsai created KAFKA-7139: - Summary: Support to exclude the internal topics in kafka-topics.sh command Key: KAFKA-7139 URL: https://issues.apache.org/jira/browse/KAFKA-7139 Project: Kafka Issue Type: Task Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai Usually we have no interesting in internal topics. However, the kafka-topics.sh can't exclude the internal topics simply. We have to write the regular to exclude the internal topics. This issue tries to add a flag (perhaps "exclude-internal") to kafka-topics.sh. If user set the flag, all internal topics will be excluded by the following commands - "list", "describe", "delete" and "alter" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4693) Consumer subscription change during rebalance causes exception
[ https://issues.apache.org/jira/browse/KAFKA-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536693#comment-16536693 ] Richard Yu commented on KAFKA-4693: --- Hi [~hachikuji] Is the validation performed when the request future succeeds (i.e. something that occurs after {{ConsumerRebalanceListener}}'s method is called) ? Because when I first ran through the code, there does not appear to be any place where there looks to be such a validation. (I was looking in the proximity of {{ConsumerCoordinator#onJoinComplete()}}) > Consumer subscription change during rebalance causes exception > -- > > Key: KAFKA-4693 > URL: https://issues.apache.org/jira/browse/KAFKA-4693 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Jason Gustafson >Priority: Minor > Fix For: 2.1.0 > > > After every rebalance, the consumer validates that the assignment received > contains only partitions from topics that were subscribed. If not, then we > raise an exception to the user. It is possible for a wakeup or an interrupt > to leave the consumer with a rebalance in progress (e.g. with a JoinGroup to > the coordinator in-flight). If the user then changes the topic subscription, > then this validation upon completion of the rebalance will fail. We should > probably detect the subscription change, eat the exception, and request > another rebalance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7132) Consider adding faster form of rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-7132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu reassigned KAFKA-7132: - Assignee: Richard Yu > Consider adding faster form of rebalancing > -- > > Key: KAFKA-7132 > URL: https://issues.apache.org/jira/browse/KAFKA-7132 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Richard Yu >Assignee: Richard Yu >Priority: Critical > Labels: performance > > Currently, when a consumer falls out of a consumer group, it will restart > processing from the last checkpointed offset. However, this design could > result in a lag which some users could not afford to let happen. For example, > lets say a consumer crashed at offset 100, with the last checkpointed offset > being at 70. When it recovers at a later offset (say, 120), it will be behind > by an offset range of 50 (120 - 70). This is because the consumer restarted > at 70, forcing it to reprocess old data. To avoid this from happening, one > option would be to allow the current consumer to start processing not from > the last checkpointed offset (which is 70 in the example), but from 120 where > it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start > reading from offset 70 in concurrency with the old process, and will be > terminated once it reaches 120. In this manner, a considerable amount of lag > can be avoided, particularly since the old consumer could proceed as if > nothing had happened. -- This message was sent by Atlassian JIRA (v7.6.3#76005)