guozhangwang commented on pull request #11451: URL: https://github.com/apache/kafka/pull/11451#issuecomment-964444781
Hi @showuon I think I agree with you that, if we are going to encode both `ownedPartitions` and `generation` into the protocol in the new bytecode, then we do not need to try to "fix" 2), but only detect and fail it. As for this specific case, I'm actually thinking that we could consider having a slight different version of `resetStateAndGeneration` which, only reset the generation id, but not the member id field of the `Generation`. More specifically, we have three callers of `resetStateAndRejoin`, and one of them is `resetGenerationOnResponseError` (other two should always reset both generation and member ids). And there are several callers of `resetGenerationOnResponseError`: * UNKNOWN_MEMBER_ID in JoinGroup: reset both memberId and generation. * UNKNOWN_MEMBER_ID in SyncGroup: reset both. * ILLEGAL_GENERATION in SyncGroup: reset generation only. * UNKNOWN_MEMBER_ID in Heartbeat: reset both. * ILLEGAL_GENERATION in Heartbeat: reset generation only. * UNKNOWN_MEMBER_ID in OffsetCommit: reset both. * ILLEGAL_GENERATION in OffsetCommit: reset generation only. When we add the generation id to the join group protocol, it means the response could also include UNKNOWN_MEMBER_ID as well: * UNKNOWN_MEMBER_ID in JoinGroup: reset both. Now back to your original question: 1) StickyAssignor in the new byte code would get the ownedPartitions from protocol directly, as in CooperativeStickyAssignor, and bump up the metadata to v2 with empty serialized data; the assign function would depend on the encoded metadata version to decide whether to retrieve the generation / ownedPartitions from the protocol (v0,v1) or from user-data (v2). Note that for old versions where the version is not actually encoded, we'd need to rely on deserialization exception with higher versions to fallback to lower versions still. 2) CooperativeAssignor in the new byte code would get the generation from protocol directly, and bump up the metadata to V2; the assign function would depend on the encoded metadata version to decide whether to retrieve the generation / ownedPartitions from the protocol (v0,v1) or from user-data (v2). Same as 1) above. 3) In the AbstractPartitionAssignor, we would have a `validateSubscription` function which takes in the ownedPartitions across all members, and needs to be called by all assignors (it is the customized assignor's own responsibility to call it), to check that ownedPartitions do not have overlaps. 4) The broker-side coordinator would check for the generation upon Join-Group: if it is a sentinel value (e.g. null) then assume it is a new member that have never been in the group yet, and hence always for the current generation; if it is not sentinel value and stale, then return the error directly. Again, upon getting such error the member should not clear its memberId if there's one but only reset the generation to null and also its ownedPartitions before re-joining. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org