[ https://issues.apache.org/jira/browse/KAFKA-16808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Badhusha Muhammed updated KAFKA-16808: -------------------------------------- Description: Even though there is only one consumer running for a group.id, On group (re)-join, We are getting 2 different members in response, Which is causing assignment to assign partition to different members, and only processing half of the partition. Log for group join and partition assignment {code:java} 24/05/13 10:26:28 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 155000 milliseconds, but spent 391883 milliseconds 24/05/13 10:26:28 INFO ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group 24/05/13 10:26:28 INFO ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] Lost previously assigned partitions topic-0 topic-1 topic-2 topic-3 topic-4 topic-5 topic-6 topic-7 24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] (Re-)joining group 24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] Group coordinator <coordinator> (id: 2147482646 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted. 24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] Rebalance failed. org.apache.kafka.common.errors.DisconnectException 24/05/13 10:26:28 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 436704. 24/05/13 10:26:28 INFO DAGScheduler: Executor lost: 436704 (epoch 0) 24/05/13 10:26:28 INFO BlockManagerMaster: Removed 436704 successfully in removeExecutor 24/05/13 10:26:28 INFO YarnClusterScheduler: Executor 436704 on ************** killed by driver. 24/05/13 10:26:28 INFO ExecutorMonitor: Executor 436704 is removed. Remove reason statistics: (gracefully decommissioned: 0, decommision unfinished: 0, driver killed: 436456, unexpectedly exited: 399). 24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] Discovered group coordinator <coordinator> (id: 2147482646 rack: null) 24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] (Re-)joining group 24/05/13 10:26:31 INFO ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] Finished assignment for group at generation 6: {consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3-d4448c3e-8f23-490b-b800-be15a14efd32=Assignment(partitions=[topic-4, topic-5, topic-6, topic-7]), consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3-0daf3497-feac-4eee-a5ca-596e2b2e1649=Assignment(partitions=[topic-0, topic-1, topic-2, topic-3])} 24/05/13 10:26:31 INFO ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] Adding newly assigned partitions: topic-0 topic-1 topic-2 topic-3{code} Can this be due to the generation reset that we are doing on rebalancing code, on v2.8.0 which eventually got fixed on version 2.8.1 https://issues.apache.org/jira/browse/KAFKA-13214 {code:java} else { final RuntimeException exception = future.exception(); // we do not need to log error for memberId required, // since it is not really an error and is transient if (!(exception instanceof MemberIdRequiredException)) { log.info("Rebalance failed.", exception); } resetJoinGroupFuture(); if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException || exception instanceof IllegalGenerationException || exception instanceof MemberIdRequiredException) continue; else if (!future.isRetriable()) throw exception; resetStateAndRejoin(); <- this should be removed timer.sleep(rebalanceConfig.retryBackoffMs);{code} * Group Join request-1 got failed with generation(generationId=1, memberId=1) (Join successfully completed on coordinator side , But got failure response. coord got disconnected before response, Can be seen from above log) * So, Generation got reset and started a rejoin before previous member leaving the group, and new response contains 2 members. So requesting the same patch to v2.8.0 to avoid multiple members in the response, when we have only one consumer running for a group.id. was: Even though there is only one consumer running for a group.id, On group (re)-join, We are getting 2 different members in response, Which is causing assignment to assign partition to different members, and only processing half of the partition. Log for group join and partition assignment {code:java} 24/05/13 10:26:28 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 155000 milliseconds, but spent 391883 milliseconds 24/05/13 10:26:28 INFO ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group 24/05/13 10:26:28 INFO ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] Lost previously assigned partitions topic-0 topic-1 topic-2 topic-3 topic-4 topic-5 topic-6 topic-7 24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] (Re-)joining group 24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] Group coordinator <coordinator> (id: 2147482646 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted. 24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] Rebalance failed. org.apache.kafka.common.errors.DisconnectException 24/05/13 10:26:28 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 436704. 24/05/13 10:26:28 INFO DAGScheduler: Executor lost: 436704 (epoch 0) 24/05/13 10:26:28 INFO BlockManagerMaster: Removed 436704 successfully in removeExecutor 24/05/13 10:26:28 INFO YarnClusterScheduler: Executor 436704 on ************** killed by driver. 24/05/13 10:26:28 INFO ExecutorMonitor: Executor 436704 is removed. Remove reason statistics: (gracefully decommissioned: 0, decommision unfinished: 0, driver killed: 436456, unexpectedly exited: 399). 24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] Discovered group coordinator <coordinator> (id: 2147482646 rack: null) 24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] (Re-)joining group 24/05/13 10:26:31 INFO ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] Finished assignment for group at generation 6: {consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3-d4448c3e-8f23-490b-b800-be15a14efd32=Assignment(partitions=[topic-4, topic-5, topic-6, topic-7]), consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3-0daf3497-feac-4eee-a5ca-596e2b2e1649=Assignment(partitions=[topic-0, topic-1, topic-2, topic-3])} 24/05/13 10:26:31 INFO ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] Adding newly assigned partitions: topic-0 topic-1 topic-2 topic-3{code} Can this be due to the generation reset that we are doing on rebalancing code on 2.8.0 which eventually got fixed on version 2.8.1 https://issues.apache.org/jira/browse/KAFKA-13214 {code:java} else { final RuntimeException exception = future.exception(); // we do not need to log error for memberId required, // since it is not really an error and is transient if (!(exception instanceof MemberIdRequiredException)) { log.info("Rebalance failed.", exception); } resetJoinGroupFuture(); if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException || exception instanceof IllegalGenerationException || exception instanceof MemberIdRequiredException) continue; else if (!future.isRetriable()) throw exception; resetStateAndRejoin(); <- this should be removed timer.sleep(rebalanceConfig.retryBackoffMs);{code} * Group Join request-1 got failed with generation(generationId=1, memberId=1) (Join successfully completed on coordinator side , But got failure response. coord got disconnected before response, Can be seen from above log) * So, Generation got reset and started a rejoin before previous member leaving the group, and new response contains 2 members. So requesting the same patch to v2.8.0 to avoid multiple members in the response, when we have only one consumer running for a group.id. > Consumer join Group response contains 2 different members > --------------------------------------------------------- > > Key: KAFKA-16808 > URL: https://issues.apache.org/jira/browse/KAFKA-16808 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 2.8.0 > Reporter: Badhusha Muhammed > Priority: Critical > Fix For: 2.8.0 > > > Even though there is only one consumer running for a group.id, On group > (re)-join, We are getting 2 different members in response, Which is causing > assignment to assign partition to different members, and only processing half > of the partition. > Log for group join and partition assignment > {code:java} > 24/05/13 10:26:28 WARN ProcessingTimeExecutor: Current batch is falling > behind. The trigger interval is 155000 milliseconds, but spent 391883 > milliseconds > 24/05/13 10:26:28 INFO ConsumerCoordinator: [Consumer > clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, > > groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] > Giving away all assigned partitions as lost since generation has been > reset,indicating that consumer is no longer part of the group > 24/05/13 10:26:28 INFO ConsumerCoordinator: [Consumer > clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, > > groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] > Lost previously assigned partitions topic-0 topic-1 topic-2 topic-3 topic-4 > topic-5 topic-6 topic-7 > 24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer > clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, > > groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] > (Re-)joining group > 24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer > clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, > > groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] > Group coordinator <coordinator> (id: 2147482646 rack: null) is unavailable > or invalid due to cause: null.isDisconnected: true. Rediscovery will be > attempted. > 24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer > clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, > > groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] > Rebalance failed. > org.apache.kafka.common.errors.DisconnectException > 24/05/13 10:26:28 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling > executor 436704. > 24/05/13 10:26:28 INFO DAGScheduler: Executor lost: 436704 (epoch 0) > 24/05/13 10:26:28 INFO BlockManagerMaster: Removed 436704 successfully in > removeExecutor > 24/05/13 10:26:28 INFO YarnClusterScheduler: Executor 436704 on > ************** killed by driver. > 24/05/13 10:26:28 INFO ExecutorMonitor: Executor 436704 is removed. Remove > reason statistics: (gracefully decommissioned: 0, decommision unfinished: 0, > driver killed: 436456, unexpectedly exited: 399). > 24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer > clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, > > groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] > Discovered group coordinator <coordinator> (id: 2147482646 rack: null) > 24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer > clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, > > groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] > (Re-)joining group > 24/05/13 10:26:31 INFO ConsumerCoordinator: [Consumer > clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, > > groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] > Finished assignment for group at generation 6: > {consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3-d4448c3e-8f23-490b-b800-be15a14efd32=Assignment(partitions=[topic-4, > topic-5, topic-6, topic-7]), > consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3-0daf3497-feac-4eee-a5ca-596e2b2e1649=Assignment(partitions=[topic-0, > topic-1, topic-2, topic-3])} > 24/05/13 10:26:31 INFO ConsumerCoordinator: [Consumer > clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3, > > groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0] > Adding newly assigned partitions: topic-0 topic-1 topic-2 topic-3{code} > > Can this be due to the generation reset that we are doing on rebalancing > code, on v2.8.0 which eventually got fixed on version 2.8.1 > https://issues.apache.org/jira/browse/KAFKA-13214 > {code:java} > else { > final RuntimeException exception = future.exception(); > // we do not need to log error for memberId required, > // since it is not really an error and is transient > if (!(exception instanceof MemberIdRequiredException)) { > log.info("Rebalance failed.", exception); > } > resetJoinGroupFuture(); > if (exception instanceof UnknownMemberIdException || > exception instanceof RebalanceInProgressException || > exception instanceof IllegalGenerationException || > exception instanceof MemberIdRequiredException) > continue; > else if (!future.isRetriable()) > throw exception; > resetStateAndRejoin(); <- this should be removed > timer.sleep(rebalanceConfig.retryBackoffMs);{code} > > > * Group Join request-1 got failed with generation(generationId=1, > memberId=1) (Join successfully completed on coordinator side , But got > failure response. coord got disconnected before response, Can be seen from > above log) > * So, Generation got reset and started a rejoin before previous member > leaving the group, and new response contains 2 members. > So requesting the same patch to v2.8.0 to avoid multiple members in the > response, when we have only one consumer running for a group.id. -- This message was sent by Atlassian Jira (v8.20.10#820010)