[ https://issues.apache.org/jira/browse/KAFKA-12984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17433653#comment-17433653 ]
Andrei D edited comment on KAFKA-12984 at 10/25/21, 12:39 PM: -------------------------------------------------------------- The entire group used 2.8.1 Kafka-client and 'CooperativeStickyAssignor'. Here are broker logs: we see that broker skipped assignment for generations 10-12 since ConsumerCoordinator was stucked on it's side !image-2021-10-25-11-53-40-221.png! and here are logs from consumers for the same timeframe: {code:java} 2021-10-20 10:14:27.878 ERROR {spanId=, traceId=} [org.apa.kaf.cli.con.int.ConsumerCoordinator] (smallrye-kafka-consumer-thread-0) [Consumer clientId=qa-qa-cf-executor-transform, groupId=qa-qa-cf-executor-transform] With the COOPERATIVE protocol, owned partitions cannot be reassigned to other members; however the assignor has reassigned partitions [qa-qa-cf-events-32, qa-qa-cf-events-13, qa-qa-cf-events-30, qa-qa-cf-events-38, qa-qa-cf-events-11] which are still owned by some members 2021-10-20 10:14:30.566 ERROR {spanId=, traceId=} [org.apa.kaf.cli.con.int.ConsumerCoordinator] (smallrye-kafka-consumer-thread-0) [Consumer clientId=qa-qa-cf-executor-transform, groupId=qa-qa-cf-executor-transform] With the COOPERATIVE protocol, owned partitions cannot be reassigned to other members; however the assignor has reassigned partitions [qa-qa-cf-events-32, qa-qa-cf-events-13, qa-qa-cf-events-30, qa-qa-cf-events-38, qa-qa-cf-events-11] which are still owned by some members 2021-10-20 10:14:34.913 ERROR {spanId=, traceId=} [org.apa.kaf.cli.con.int.ConsumerCoordinator] (smallrye-kafka-consumer-thread-0) [Consumer clientId=qa-qa-cf-executor-transform, groupId=qa-qa-cf-executor-transform] With the COOPERATIVE protocol, owned partitions cannot be reassigned to other members; however the assignor has reassigned partitions [qa-qa-cf-events-32, qa-qa-cf-events-13, qa-qa-cf-events-30, qa-qa-cf-events-38, qa-qa-cf-events-11] which are still owned by some members 2021-10-20 10:14:34.920 ERROR {spanId=, traceId=} [io.sma.rea.mes.kafka] (smallrye-kafka-consumer-thread-0) SRMSG18217: Unable to read a record from Kafka topics '[qa-qa-cf-events]': java.lang.IllegalStateException: Retries exhausted: 3/3 2021-10-20T13:14:34.928+03:00 Caused by: java.lang.IllegalStateException: Assignor supporting the COOPERATIVE protocol violates its requirements 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.validateCooperativeAssignment(ConsumerCoordinator.java:668) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:592) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:693) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:599) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:562) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1182) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1157) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:247) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:426) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) 2021-10-20T13:14:34.928+03:00 at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.lambda$poll$4(ReactiveKafkaConsumer.java:131) 2021-10-20T13:14:34.928+03:00 at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.lambda$runOnPollingThread$0(ReactiveKafkaConsumer.java:101) 2021-10-20T13:14:34.928+03:00 at io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21) 2021-10-20T13:14:34.928+03:00 at io.smallrye.mutiny.operators.uni.builders.UniCreateFromItemSupplier.subscribe(UniCreateFromItemSupplier.java:28) 2021-10-20 10:14:34.921 WARN {spanId=, traceId=} [io.sma.rea.mes.kafka] (smallrye-kafka-consumer-thread-0) SRMSG18228: A failure has been reported for Kafka topics '[qa-qa-cf-events]': java.lang.IllegalStateException: Retries exhausted: 3/3 {code} UPD: We run another tests and despite manually pulling and using 'CooperativeStickyAssignor' from Kafka 3.0 client in our application, now we see the same errors as well, so it seems the issue still exists (maybe something in ConsumerCoordinator class) was (Author: andy_dufresne): The entire group used 2.8.1 Kafka-client and 'CooperativeStickyAssignor'. Here are broker logs: we see that broker skipped assignment for generations 10-12 since ConsumerCoordinator was stucked on it's side !image-2021-10-25-11-53-40-221.png! and here are logs from consumers for the same timeframe: {code:java} 2021-10-20 10:14:27.878 ERROR {spanId=, traceId=} [org.apa.kaf.cli.con.int.ConsumerCoordinator] (smallrye-kafka-consumer-thread-0) [Consumer clientId=qa-qa-cf-executor-transform, groupId=qa-qa-cf-executor-transform] With the COOPERATIVE protocol, owned partitions cannot be reassigned to other members; however the assignor has reassigned partitions [qa-qa-cf-events-32, qa-qa-cf-events-13, qa-qa-cf-events-30, qa-qa-cf-events-38, qa-qa-cf-events-11] which are still owned by some members 2021-10-20 10:14:30.566 ERROR {spanId=, traceId=} [org.apa.kaf.cli.con.int.ConsumerCoordinator] (smallrye-kafka-consumer-thread-0) [Consumer clientId=qa-qa-cf-executor-transform, groupId=qa-qa-cf-executor-transform] With the COOPERATIVE protocol, owned partitions cannot be reassigned to other members; however the assignor has reassigned partitions [qa-qa-cf-events-32, qa-qa-cf-events-13, qa-qa-cf-events-30, qa-qa-cf-events-38, qa-qa-cf-events-11] which are still owned by some members 2021-10-20 10:14:34.913 ERROR {spanId=, traceId=} [org.apa.kaf.cli.con.int.ConsumerCoordinator] (smallrye-kafka-consumer-thread-0) [Consumer clientId=qa-qa-cf-executor-transform, groupId=qa-qa-cf-executor-transform] With the COOPERATIVE protocol, owned partitions cannot be reassigned to other members; however the assignor has reassigned partitions [qa-qa-cf-events-32, qa-qa-cf-events-13, qa-qa-cf-events-30, qa-qa-cf-events-38, qa-qa-cf-events-11] which are still owned by some members 2021-10-20 10:14:34.920 ERROR {spanId=, traceId=} [io.sma.rea.mes.kafka] (smallrye-kafka-consumer-thread-0) SRMSG18217: Unable to read a record from Kafka topics '[qa-qa-cf-events]': java.lang.IllegalStateException: Retries exhausted: 3/3 2021-10-20T13:14:34.928+03:00 Caused by: java.lang.IllegalStateException: Assignor supporting the COOPERATIVE protocol violates its requirements 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.validateCooperativeAssignment(ConsumerCoordinator.java:668) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:592) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:693) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:599) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:562) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1182) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1157) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:247) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:426) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) 2021-10-20T13:14:34.928+03:00 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) 2021-10-20T13:14:34.928+03:00 at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.lambda$poll$4(ReactiveKafkaConsumer.java:131) 2021-10-20T13:14:34.928+03:00 at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.lambda$runOnPollingThread$0(ReactiveKafkaConsumer.java:101) 2021-10-20T13:14:34.928+03:00 at io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21) 2021-10-20T13:14:34.928+03:00 at io.smallrye.mutiny.operators.uni.builders.UniCreateFromItemSupplier.subscribe(UniCreateFromItemSupplier.java:28) 2021-10-20 10:14:34.921 WARN {spanId=, traceId=} [io.sma.rea.mes.kafka] (smallrye-kafka-consumer-thread-0) SRMSG18228: A failure has been reported for Kafka topics '[qa-qa-cf-events]': java.lang.IllegalStateException: Retries exhausted: 3/3 {code} UPD: We run another tests and despite manually pulling and using 'CooperativeStickyAssignor' from Kafka 3.0 client in our application, now we see the same errors as well, so it seems the issue still exists > Cooperative sticky assignor can get stuck with invalid SubscriptionState > input metadata > --------------------------------------------------------------------------------------- > > Key: KAFKA-12984 > URL: https://issues.apache.org/jira/browse/KAFKA-12984 > Project: Kafka > Issue Type: Bug > Components: consumer > Reporter: A. Sophie Blee-Goldman > Assignee: A. Sophie Blee-Goldman > Priority: Blocker > Fix For: 2.8.1, 3.0.0 > > Attachments: image-2021-10-25-11-53-40-221.png > > > Some users have reported seeing their consumer group become stuck in the > CompletingRebalance phase when using the cooperative-sticky assignor. Based > on the request metadata we were able to deduce that multiple consumers were > reporting the same partition(s) in their "ownedPartitions" field of the > consumer protocol. Since this is an invalid state, the input causes the > cooperative-sticky assignor to detect that something is wrong and throw an > IllegalStateException. If the consumer application is set up to simply retry, > this will cause the group to appear to hang in the rebalance state. > The "ownedPartitions" field is encoded based on the ConsumerCoordinator's > SubscriptionState, which was assumed to always be up to date. However there > may be cases where the consumer has dropped out of the group but fails to > clear the SubscriptionState, allowing it to report some partitions as owned > that have since been reassigned to another member. > We should (a) fix the sticky assignment algorithm to resolve cases of > improper input conditions by invalidating the "ownedPartitions" in cases of > double ownership, and (b) shore up the ConsumerCoordinator logic to better > handle rejoining the group and keeping its internal state consistent. See > KAFKA-12983 for more details on (b) -- This message was sent by Atlassian Jira (v8.3.4#803005)