[ 
https://issues.apache.org/jira/browse/KAFKA-16808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Badhusha Muhammed updated KAFKA-16808:
--------------------------------------
    Description: 
Even though there is only one consumer running for a group.id, On group 
(re)-join, We are  getting 2 different members in response, Which is causing 
assignment to assign partition to different members, and only processing half 
of the partition. 

Log for group join and partition assignment 
{code:java}
 24/05/13 10:26:28 WARN ProcessingTimeExecutor: Current batch is falling 
behind. The trigger interval is 155000 milliseconds, but spent 391883 
milliseconds
24/05/13 10:26:28 INFO ConsumerCoordinator: [Consumer 
clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
 
groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
 Giving away all assigned partitions as lost since generation has been 
reset,indicating that consumer is no longer part of the group
24/05/13 10:26:28 INFO ConsumerCoordinator: [Consumer 
clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
 
groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
 Lost previously assigned partitions topic-0 topic-1 topic-2 topic-3 topic-4 
topic-5 topic-6 topic-7
24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer 
clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
 
groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
 (Re-)joining group
24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer 
clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
 
groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
 Group coordinator <coordinator> (id: 2147482646 rack: null) is unavailable or 
invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted.
24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer 
clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
 
groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
 Rebalance failed.
org.apache.kafka.common.errors.DisconnectException
24/05/13 10:26:28 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling 
executor 436704.
24/05/13 10:26:28 INFO DAGScheduler: Executor lost: 436704 (epoch 0)
24/05/13 10:26:28 INFO BlockManagerMaster: Removed 436704 successfully in 
removeExecutor
24/05/13 10:26:28 INFO YarnClusterScheduler: Executor 436704 on ************** 
killed by driver.
24/05/13 10:26:28 INFO ExecutorMonitor: Executor 436704 is removed. Remove 
reason statistics: (gracefully decommissioned: 0, decommision unfinished: 0, 
driver killed: 436456, unexpectedly exited: 399).
24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer 
clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
 
groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
 Discovered group coordinator <coordinator> (id: 2147482646 rack: null)
24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer 
clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
 
groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
 (Re-)joining group
24/05/13 10:26:31 INFO ConsumerCoordinator: [Consumer 
clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
 
groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
 Finished assignment for group at generation 6: 
{consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3-d4448c3e-8f23-490b-b800-be15a14efd32=Assignment(partitions=[topic-4,
 topic-5, topic-6, topic-7]), 
consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3-0daf3497-feac-4eee-a5ca-596e2b2e1649=Assignment(partitions=[topic-0,
 topic-1, topic-2, topic-3])}
24/05/13 10:26:31 INFO ConsumerCoordinator: [Consumer 
clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
 
groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
 Adding newly assigned partitions: topic-0 topic-1 topic-2 topic-3{code}
 

Can this be due to the generation reset that we are doing on rebalancing code, 
on v2.8.0 which eventually got fixed on version 2.8.1 
https://issues.apache.org/jira/browse/KAFKA-13214
{code:java}
 else {
    final RuntimeException exception = future.exception();

    // we do not need to log error for memberId required,
    // since it is not really an error and is transient
    if (!(exception instanceof MemberIdRequiredException)) {
        log.info("Rebalance failed.", exception);
    }

    resetJoinGroupFuture();
    if (exception instanceof UnknownMemberIdException ||
        exception instanceof RebalanceInProgressException ||
        exception instanceof IllegalGenerationException ||
        exception instanceof MemberIdRequiredException)
        continue;
    else if (!future.isRetriable())
        throw exception;

    resetStateAndRejoin(); <- this should be removed 
    timer.sleep(rebalanceConfig.retryBackoffMs);{code}
 

 
 * Group Join request-1 got failed with generation(generationId=1, memberId=1) 
(Join successfully completed  on coordinator side , But got failure response. 
coord got disconnected before response, Can be seen from above log)

 * So, Generation got reset and started a  rejoin before previous member 
leaving the group,  and new response contains 2 members. 

So requesting the same patch to v2.8.0 to avoid multiple members in the 
response, when we have only one consumer running for a group.id. 

  was:
Even though there is only one consumer running for a group.id, On group 
(re)-join, We are  getting 2 different members in response, Which is causing 
assignment to assign partition to different members, and only processing half 
of the partition. 

Log for group join and partition assignment 
{code:java}
 24/05/13 10:26:28 WARN ProcessingTimeExecutor: Current batch is falling 
behind. The trigger interval is 155000 milliseconds, but spent 391883 
milliseconds
24/05/13 10:26:28 INFO ConsumerCoordinator: [Consumer 
clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
 
groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
 Giving away all assigned partitions as lost since generation has been 
reset,indicating that consumer is no longer part of the group
24/05/13 10:26:28 INFO ConsumerCoordinator: [Consumer 
clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
 
groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
 Lost previously assigned partitions topic-0 topic-1 topic-2 topic-3 topic-4 
topic-5 topic-6 topic-7
24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer 
clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
 
groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
 (Re-)joining group
24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer 
clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
 
groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
 Group coordinator <coordinator> (id: 2147482646 rack: null) is unavailable or 
invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted.
24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer 
clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
 
groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
 Rebalance failed.
org.apache.kafka.common.errors.DisconnectException
24/05/13 10:26:28 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling 
executor 436704.
24/05/13 10:26:28 INFO DAGScheduler: Executor lost: 436704 (epoch 0)
24/05/13 10:26:28 INFO BlockManagerMaster: Removed 436704 successfully in 
removeExecutor
24/05/13 10:26:28 INFO YarnClusterScheduler: Executor 436704 on ************** 
killed by driver.
24/05/13 10:26:28 INFO ExecutorMonitor: Executor 436704 is removed. Remove 
reason statistics: (gracefully decommissioned: 0, decommision unfinished: 0, 
driver killed: 436456, unexpectedly exited: 399).
24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer 
clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
 
groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
 Discovered group coordinator <coordinator> (id: 2147482646 rack: null)
24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer 
clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
 
groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
 (Re-)joining group
24/05/13 10:26:31 INFO ConsumerCoordinator: [Consumer 
clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
 
groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
 Finished assignment for group at generation 6: 
{consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3-d4448c3e-8f23-490b-b800-be15a14efd32=Assignment(partitions=[topic-4,
 topic-5, topic-6, topic-7]), 
consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3-0daf3497-feac-4eee-a5ca-596e2b2e1649=Assignment(partitions=[topic-0,
 topic-1, topic-2, topic-3])}
24/05/13 10:26:31 INFO ConsumerCoordinator: [Consumer 
clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
 
groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
 Adding newly assigned partitions: topic-0 topic-1 topic-2 topic-3{code}
 

Can this be due to the generation reset that we are doing on rebalancing code 
on 2.8.0 which eventually got fixed on version 2.8.1 
https://issues.apache.org/jira/browse/KAFKA-13214
{code:java}
 else {
    final RuntimeException exception = future.exception();

    // we do not need to log error for memberId required,
    // since it is not really an error and is transient
    if (!(exception instanceof MemberIdRequiredException)) {
        log.info("Rebalance failed.", exception);
    }

    resetJoinGroupFuture();
    if (exception instanceof UnknownMemberIdException ||
        exception instanceof RebalanceInProgressException ||
        exception instanceof IllegalGenerationException ||
        exception instanceof MemberIdRequiredException)
        continue;
    else if (!future.isRetriable())
        throw exception;

    resetStateAndRejoin(); <- this should be removed 
    timer.sleep(rebalanceConfig.retryBackoffMs);{code}
 

 
 * Group Join request-1 got failed with generation(generationId=1, memberId=1) 
(Join successfully completed  on coordinator side , But got failure response. 
coord got disconnected before response, Can be seen from above log)

 * So, Generation got reset and started a  rejoin before previous member 
leaving the group,  and new response contains 2 members. 

So requesting the same patch to v2.8.0 to avoid multiple members in the 
response, when we have only one consumer running for a group.id. 


> Consumer join Group response contains 2 different members
> ---------------------------------------------------------
>
>                 Key: KAFKA-16808
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16808
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 2.8.0
>            Reporter: Badhusha Muhammed
>            Priority: Critical
>             Fix For: 2.8.0
>
>
> Even though there is only one consumer running for a group.id, On group 
> (re)-join, We are  getting 2 different members in response, Which is causing 
> assignment to assign partition to different members, and only processing half 
> of the partition. 
> Log for group join and partition assignment 
> {code:java}
>  24/05/13 10:26:28 WARN ProcessingTimeExecutor: Current batch is falling 
> behind. The trigger interval is 155000 milliseconds, but spent 391883 
> milliseconds
> 24/05/13 10:26:28 INFO ConsumerCoordinator: [Consumer 
> clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
>  
> groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
>  Giving away all assigned partitions as lost since generation has been 
> reset,indicating that consumer is no longer part of the group
> 24/05/13 10:26:28 INFO ConsumerCoordinator: [Consumer 
> clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
>  
> groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
>  Lost previously assigned partitions topic-0 topic-1 topic-2 topic-3 topic-4 
> topic-5 topic-6 topic-7
> 24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer 
> clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
>  
> groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
>  (Re-)joining group
> 24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer 
> clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
>  
> groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
>  Group coordinator <coordinator> (id: 2147482646 rack: null) is unavailable 
> or invalid due to cause: null.isDisconnected: true. Rediscovery will be 
> attempted.
> 24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer 
> clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
>  
> groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
>  Rebalance failed.
> org.apache.kafka.common.errors.DisconnectException
> 24/05/13 10:26:28 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling 
> executor 436704.
> 24/05/13 10:26:28 INFO DAGScheduler: Executor lost: 436704 (epoch 0)
> 24/05/13 10:26:28 INFO BlockManagerMaster: Removed 436704 successfully in 
> removeExecutor
> 24/05/13 10:26:28 INFO YarnClusterScheduler: Executor 436704 on 
> ************** killed by driver.
> 24/05/13 10:26:28 INFO ExecutorMonitor: Executor 436704 is removed. Remove 
> reason statistics: (gracefully decommissioned: 0, decommision unfinished: 0, 
> driver killed: 436456, unexpectedly exited: 399).
> 24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer 
> clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
>  
> groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
>  Discovered group coordinator <coordinator> (id: 2147482646 rack: null)
> 24/05/13 10:26:28 INFO AbstractCoordinator: [Consumer 
> clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
>  
> groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
>  (Re-)joining group
> 24/05/13 10:26:31 INFO ConsumerCoordinator: [Consumer 
> clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
>  
> groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
>  Finished assignment for group at generation 6: 
> {consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3-d4448c3e-8f23-490b-b800-be15a14efd32=Assignment(partitions=[topic-4,
>  topic-5, topic-6, topic-7]), 
> consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3-0daf3497-feac-4eee-a5ca-596e2b2e1649=Assignment(partitions=[topic-0,
>  topic-1, topic-2, topic-3])}
> 24/05/13 10:26:31 INFO ConsumerCoordinator: [Consumer 
> clientId=consumer-spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0-3,
>  
> groupId=spark-kafka-source-c48ddb22-762b-4aa3-bf60-7541efb19b2a-1436000591-driver-0]
>  Adding newly assigned partitions: topic-0 topic-1 topic-2 topic-3{code}
>  
> Can this be due to the generation reset that we are doing on rebalancing 
> code, on v2.8.0 which eventually got fixed on version 2.8.1 
> https://issues.apache.org/jira/browse/KAFKA-13214
> {code:java}
>  else {
>     final RuntimeException exception = future.exception();
>     // we do not need to log error for memberId required,
>     // since it is not really an error and is transient
>     if (!(exception instanceof MemberIdRequiredException)) {
>         log.info("Rebalance failed.", exception);
>     }
>     resetJoinGroupFuture();
>     if (exception instanceof UnknownMemberIdException ||
>         exception instanceof RebalanceInProgressException ||
>         exception instanceof IllegalGenerationException ||
>         exception instanceof MemberIdRequiredException)
>         continue;
>     else if (!future.isRetriable())
>         throw exception;
>     resetStateAndRejoin(); <- this should be removed 
>     timer.sleep(rebalanceConfig.retryBackoffMs);{code}
>  
>  
>  * Group Join request-1 got failed with generation(generationId=1, 
> memberId=1) (Join successfully completed  on coordinator side , But got 
> failure response. coord got disconnected before response, Can be seen from 
> above log)
>  * So, Generation got reset and started a  rejoin before previous member 
> leaving the group,  and new response contains 2 members. 
> So requesting the same patch to v2.8.0 to avoid multiple members in the 
> response, when we have only one consumer running for a group.id. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to