[ 
https://issues.apache.org/jira/browse/KAFKA-19222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17949018#comment-17949018
 ] 

Travis Bischel commented on KAFKA-19222:
----------------------------------------

Re-issuing with all fields is the solution I've stumbled on. KIP-848 should be 
updated to specifically mention that if a request fails in any capacity, it 
should be re-issued with all fields present -- that's the real root of this 
issue.

Specifically, 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ResponseHandling
 -- this should mention that if a request fails in _any capacity_ -- due to 
network issues, anything -- it should be not retried.

As written, the KIP mentions fields only need to be sent if they changed – i.e.,
 * I send I own partition A, broker replies success
 * I send null – still own A, but the response is missed
 * I send null – _the broker should not have cleared state from the first 
request, because I never said I gave up partition A_

Looking deeper, this looks primarily to be a limitation with 
{ModernGroupMember.java} not storing owned partitions, while it does store 
everything else. Other fields -- SubscribedTopicRegex, SubscribedTopicNames -- 
are responded to via a delta of what the broker has stored from the last state 
to the new state in the current request. Topics ({ownedPartitions} in the 
GroupMetadataManager) are instead calculated from a delta of what's in the 
current request vs. what the broker wants the target to be.

I plan to open a separate issue about irrecoverable final commits if 
STALE_MEMBER_EPOCH Is encountered.

> Invalid FENCED_MEMBER_EPOCH error to ConsumerGroupHeartbeat
> -----------------------------------------------------------
>
>                 Key: KAFKA-19222
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19222
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 4.0.0, 4.0.1
>            Reporter: Travis Bischel
>            Priority: Major
>
> {{throwIfConsumerGroupMemberEpochIsInvalid}} is meant to check the epoch in 
> heartbeat requests. If a ConsumerGroupHeartbeat response is lost (connection 
> dies before the client reads the response), the client may issue a 
> ConsumerGroupHeartbeat with the old epoch. 
> {{throwIfConsumerGroupMemberEpochIsInvalid}} means to allow this with this 
> conditional:
> {code:java}
>         } else if (receivedMemberEpoch < member.memberEpoch()) {
>             // If the member comes with the previous epoch and has a subset 
> of the current assignment partitions,
>             // we accept it because the response with the bumped epoch may 
> have been lost.
>             if (receivedMemberEpoch != member.previousMemberEpoch() || 
> !isSubset(ownedTopicPartitions, member.assignedPartitions())) {
>                 throw new FencedMemberEpochException("The consumer group 
> member has a smaller member "
>                     + "epoch (" + receivedMemberEpoch + ") than the one known 
> by the group coordinator ("
>                     + member.memberEpoch() + "). The member must abandon all 
> its partitions and rejoin.");
>             }
>         }
> {code}
> However, {{isSubset}} immediately returns false if {{ownedPartitions}} is 
> null. Clients are not meant to send the {{ownedPartitions}} field 
> (ConsumerHeartbeatRequest.TopicPartitions) _unless_ the {{ownedPartitions}} 
> has changed.
> As a concrete example, here are logs from a broker for a client. In this flow,
> * ConsumerGroupHeartbeat is being issued within the client
> * Concurrently, I am trying to leave the group -- this canceled the in-flight 
> ConsumerGroupHeartbeat from the client side
> * I issue OffsetCommit (before the group is left)
> * The OffsetCommit fails with STALE_MEMBER_EPOCH
> * I issue a ConsumerGroupHeartbeat with _only_ the Group, MemberID, and 
> MemberEpoch fields set -- i.e. _nothing else is changing just give me the 
> latest epoch_
> * Broker side, the broker finally processes my initial ConsumerGroupHeartbeat 
> that was initially sent (and already canceled client side) -- in the logs you 
> can see the epoch is bumped to 7 and the heartbeat is successful
> * Broker side, the broker processes a _duplicate ConsumerGroupHeartbeat_ and 
> replies FENCED_MEMBER_EPOCH
> * Client side, I bail on committing and finally leave the group. I send a 
> ConsumerGroupHeartbeat with memberEpoch -1.
> * Broker side, the final heartbeat is processed successfully.
> Broker side logs:
> {code}
> 2025-04-30 17:22:26,024 
> [data-plane-kafka-network-thread-2-ListenerName(PLAINTEXT)-PLAINTEXT-2] DEBUG 
> kafka.request.logger - Completed 
> request:{"isForwarded":false,"requestHeader":{"requestApiKey":8,"requestApiVersion":9,"correlationId":0,"clientId":"kgo","requestApiKeyName":"OFFSET_COMMIT"},"request":{"groupId":"c24d1994653ddbaa5ff68d14c54c8733a7f02f732f4da4b5811f80ce0b247c2d","generationIdOrMemberEpoch":6,"memberId":"6j403VCp6yHIJrSlfWTtYg==","groupInstanceId":null,"topics":[{"name":"c5fd69bff34c4bde0f955fcf6de5d1cc2487067028e2383b6106804d3a9949b5","partitions":[{"partitionIndex":4,"committedOffset":45435,"committedLeaderEpoch":0,"committedMetadata":"6j403VCp6yHIJrSlfWTtYg=="},{"partitionIndex":3,"committedOffset":45371,"committedLeaderEpoch":0,"committedMetadata":"6j403VCp6yHIJrSlfWTtYg=="}]}]},"response":{"throttleTimeMs":0,"topics":[{"name":"c5fd69bff34c4bde0f955fcf6de5d1cc2487067028e2383b6106804d3a9949b5","partitions":[{"partitionIndex":4,"errorCode":113},{"partitionIndex":3,"errorCode":113}]}]},"connection":"127.0.0.1:9094-127.0.0.1:56328-2-197","totalTimeMs":175.381,"requestQueueTimeMs":0.027,"localTimeMs":0.037,"remoteTimeMs":175.075,"throttleTimeMs":0,"responseQueueTimeMs":0.118,"sendTimeMs":0.121,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}}
> 2025-04-30 17:22:26,121 
> [data-plane-kafka-network-thread-2-ListenerName(PLAINTEXT)-PLAINTEXT-2] DEBUG 
> kafka.request.logger - Completed 
> request:{"isForwarded":false,"requestHeader":{"requestApiKey":68,"requestApiVersion":1,"correlationId":14,"clientId":"kgo","requestApiKeyName":"CONSUMER_GROUP_HEARTBEAT"},"request":{"groupId":"c24d1994653ddbaa5ff68d14c54c8733a7f02f732f4da4b5811f80ce0b247c2d","memberId":"6j403VCp6yHIJrSlfWTtYg==","memberEpoch":6,"instanceId":null,"rackId":null,"rebalanceTimeoutMs":-1,"subscribedTopicNames":null,"subscribedTopicRegex":null,"serverAssignor":null,"topicPartitions":null},"response":{"throttleTimeMs":0,"errorCode":0,"errorMessage":null,"memberId":"6j403VCp6yHIJrSlfWTtYg==","memberEpoch":7,"heartbeatIntervalMs":5000,"assignment":null},"connection":"127.0.0.1:9094-127.0.0.1:48238-2-153","totalTimeMs":273.886,"requestQueueTimeMs":0.021,"localTimeMs":0.025,"remoteTimeMs":273.594,"throttleTimeMs":0,"responseQueueTimeMs":0.062,"sendTimeMs":0.183,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":{"softwareName":"kgo","softwareVersion":"unknown"}}
> 2025-04-30 17:22:26,135 
> [data-plane-kafka-network-thread-2-ListenerName(PLAINTEXT)-PLAINTEXT-2] DEBUG 
> kafka.request.logger - Completed 
> request:{"isForwarded":false,"requestHeader":{"requestApiKey":68,"requestApiVersion":1,"correlationId":1,"clientId":"kgo","requestApiKeyName":"CONSUMER_GROUP_HEARTBEAT"},"request":{"groupId":"c24d1994653ddbaa5ff68d14c54c8733a7f02f732f4da4b5811f80ce0b247c2d","memberId":"6j403VCp6yHIJrSlfWTtYg==","memberEpoch":6,"instanceId":null,"rackId":null,"rebalanceTimeoutMs":-1,"subscribedTopicNames":null,"subscribedTopicRegex":null,"serverAssignor":null,"topicPartitions":null},"response":{"throttleTimeMs":0,"errorCode":110,"errorMessage":"The
>  consumer group member has a smaller member epoch (6) than the one known by 
> the group coordinator (7). The member must abandon all its partitions and 
> rejoin.","memberId":null,"memberEpoch":0,"heartbeatIntervalMs":0,"assignment":null},"connection":"127.0.0.1:9094-127.0.0.1:56328-2-197","totalTimeMs":3.304,"requestQueueTimeMs":0.095,"localTimeMs":0.091,"remoteTimeMs":2.67,"throttleTimeMs":0,"responseQueueTimeMs":0.306,"sendTimeMs":0.14,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}}
> 2025-04-30 17:22:26,153 
> [data-plane-kafka-network-thread-2-ListenerName(PLAINTEXT)-PLAINTEXT-2] DEBUG 
> kafka.request.logger - Completed 
> request:{"isForwarded":false,"requestHeader":{"requestApiKey":68,"requestApiVersion":1,"correlationId":2,"clientId":"kgo","requestApiKeyName":"CONSUMER_GROUP_HEARTBEAT"},"request":{"groupId":"c24d1994653ddbaa5ff68d14c54c8733a7f02f732f4da4b5811f80ce0b247c2d","memberId":"6j403VCp6yHIJrSlfWTtYg==","memberEpoch":-1,"instanceId":null,"rackId":null,"rebalanceTimeoutMs":-1,"subscribedTopicNames":null,"subscribedTopicRegex":null,"serverAssignor":null,"topicPartitions":null},"response":{"throttleTimeMs":0,"errorCode":0,"errorMessage":null,"memberId":"6j403VCp6yHIJrSlfWTtYg==","memberEpoch":-1,"heartbeatIntervalMs":0,"assignment":null},"connection":"127.0.0.1:9094-127.0.0.1:56328-2-197","totalTimeMs":6.528,"requestQueueTimeMs":0.088,"localTimeMs":0.116,"remoteTimeMs":6.166,"throttleTimeMs":0,"responseQueueTimeMs":0.053,"sendTimeMs":0.102,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}}
> {code}
> This can be very easily triggered by always issuing ConsumerGroupHeartbeat 
> twice, ignoring the first response, in any test that has rebalancing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to