lucasbru commented on code in PR #19691: URL: https://github.com/apache/kafka/pull/19691#discussion_r2087438125
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java: ########## @@ -355,6 +357,9 @@ public Map<String, Subtopology> subtopologies() { return subtopologies; } + public int endpointInformationEpoch() { + return endpointInformationEpoch; Review Comment: `StreamsRebalanceData` is used as an interface between the application thread and the streams thread. The data here should be immutable or otherwise threadsafe. But I'm not sure we need even access this epoch from the application thread? I'd consider moving the client-side epoch to `StreamsGroupHeartbeatManager.HeartbeatState` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1983,8 +1984,16 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData() .setMemberId(updatedMember.memberId()) .setMemberEpoch(updatedMember.memberEpoch()) - .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId)) - .setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group)); + .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId)); + + if (group.endpointInformationEpoch() > memberEndpointEpochInRequest) { + response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group)); + response.setEndpointInformationEpoch(group.endpointInformationEpoch()); + } else { + int responseEndpoint = Math.min(group.endpointInformationEpoch(), memberEndpointEpochInRequest); Review Comment: I would always respond with `group.endpointInformationEpoch()` and omit this if-block. If the broker-side epoch is lost, it will be less than then member epoch, in which case we should reset the member epoch to the broker epoch and resend the endpoint information, just in case. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1993,6 +2002,7 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().activeTasks())); response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().standbyTasks())); response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().warmupTasks())); + group.setEndpointInformationEpoch(group.endpointInformationEpoch() + 1); Review Comment: maybe we can move the block around setting endpoint information in the response below this block. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1983,8 +1984,16 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData() .setMemberId(updatedMember.memberId()) .setMemberEpoch(updatedMember.memberEpoch()) - .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId)) - .setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group)); + .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId)); + + if (group.endpointInformationEpoch() > memberEndpointEpochInRequest) { Review Comment: For the case where we lose the group's endpoint information during fail-over, shouldn't we also send the information, just in case? Meaning that the condition should be ```group.endpointInformationEpoch() != memberEndpointEpochInRequest``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org