mjsax commented on code in PR #19691: URL: https://github.com/apache/kafka/pull/19691#discussion_r2085659458
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java: ########## @@ -355,6 +357,9 @@ public Map<String, Subtopology> subtopologies() { return subtopologies; } + public int endpointInformationEpoch() { + return endpointInformationEpoch; + } public int topologyEpoch() { Review Comment: nit: missing empty line ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java: ########## @@ -329,6 +329,8 @@ public String toString() { private final AtomicReference<List<StreamsGroupHeartbeatResponseData.Status>> statuses = new AtomicReference<>(List.of()); + private int endpointInformationEpoch = 0; Review Comment: Should we init this with `-1` ? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1983,8 +1984,16 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData() .setMemberId(updatedMember.memberId()) .setMemberEpoch(updatedMember.memberEpoch()) - .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId)) - .setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group)); + .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId)); + + if (group.endpointInformationEpoch() > memberEndpointEpochInRequest) { + response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group)); + response.setEndpointInformationEpoch(group.endpointInformationEpoch()); + } else { + int responseEndpoint = Math.min(group.endpointInformationEpoch(), memberEndpointEpochInRequest); Review Comment: We do not persist the endpoint-epoch, right? That is the case you are talking about? But I thought for this case `group.endpointInformationEpoch()` would be `-1` [or whatever value we initialize it] (ie unknown?) -- So `memberEndpointEpochInRequest` should always be larger? Correct me if I am wrong. The end-to-end control flow, is not totally clear to me atm. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1983,8 +1984,16 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData() .setMemberId(updatedMember.memberId()) .setMemberEpoch(updatedMember.memberEpoch()) - .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId)) - .setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group)); + .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId)); + + if (group.endpointInformationEpoch() > memberEndpointEpochInRequest) { + response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group)); + response.setEndpointInformationEpoch(group.endpointInformationEpoch()); + } else { + int responseEndpoint = Math.min(group.endpointInformationEpoch(), memberEndpointEpochInRequest); Review Comment: ```suggestion int responseEndpointInformationEpoch = Math.min(group.endpointInformationEpoch(), memberEndpointEpochInRequest); ``` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1842,7 +1842,8 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream String processId, Endpoint userEndpoint, List<KeyValue> clientTags, - boolean shutdownApplication + boolean shutdownApplication, + int memberEndpointEpochInRequest Review Comment: All other variables also don't use `InRequest` suffix ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java: ########## @@ -205,6 +205,12 @@ public static class DeadlineAndEpoch { */ private Optional<String> shutdownRequestMemberId = Optional.empty(); + /** + * The current epoch for endpoint information, this is used to determine when to send + * updated endpoint information to members of the group. + */ + private int endpointInformationEpoch; Review Comment: Should we init this to `-1`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1993,6 +2002,7 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().activeTasks())); response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().standbyTasks())); response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().warmupTasks())); + group.setEndpointInformationEpoch(group.endpointInformationEpoch() + 1); Review Comment: Do we need to change the reponse, too, using the bumped epoch, as also ensure the endpoint information is set? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1842,7 +1842,8 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream String processId, Endpoint userEndpoint, List<KeyValue> clientTags, - boolean shutdownApplication + boolean shutdownApplication, + int memberEndpointEpochInRequest Review Comment: ```suggestion int memberEndpointEpoch ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org