lucasbru commented on code in PR #19691:
URL: https://github.com/apache/kafka/pull/19691#discussion_r2087438125


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java:
##########
@@ -355,6 +357,9 @@ public Map<String, Subtopology> subtopologies() {
         return subtopologies;
     }
 
+    public int endpointInformationEpoch() {
+        return endpointInformationEpoch;

Review Comment:
   `StreamsRebalanceData` is used as an interface between the application 
thread and the streams thread. The data here should be immutable or otherwise 
threadsafe. But I'm not sure we need even access this epoch from the 
application thread? I'd consider moving the client-side epoch to 
`StreamsGroupHeartbeatManager.HeartbeatState`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1983,8 +1984,16 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
         StreamsGroupHeartbeatResponseData response = new 
StreamsGroupHeartbeatResponseData()
             .setMemberId(updatedMember.memberId())
             .setMemberEpoch(updatedMember.memberEpoch())
-            .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId))
-            
.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group));
+            .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId));
+
+        if (group.endpointInformationEpoch() > memberEndpointEpochInRequest) {
+            
response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group));
+            
response.setEndpointInformationEpoch(group.endpointInformationEpoch());
+        } else {
+            int responseEndpoint = Math.min(group.endpointInformationEpoch(), 
memberEndpointEpochInRequest);

Review Comment:
   I would always respond with `group.endpointInformationEpoch()` and omit this 
if-block.
   
   If the broker-side epoch is lost, it will be less than then member epoch, in 
which case we should reset the member epoch to the broker epoch and resend the 
endpoint information, just in case.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1993,6 +2002,7 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
             
response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().activeTasks()));
             
response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().standbyTasks()));
             
response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().warmupTasks()));
+            group.setEndpointInformationEpoch(group.endpointInformationEpoch() 
+ 1);

Review Comment:
   maybe we can move the block around setting endpoint information in the 
response below this block.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1983,8 +1984,16 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
         StreamsGroupHeartbeatResponseData response = new 
StreamsGroupHeartbeatResponseData()
             .setMemberId(updatedMember.memberId())
             .setMemberEpoch(updatedMember.memberEpoch())
-            .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId))
-            
.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group));
+            .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId));
+
+        if (group.endpointInformationEpoch() > memberEndpointEpochInRequest) {

Review Comment:
   For the case where we lose the group's endpoint information during 
fail-over, shouldn't we also send the information, just in case?
   
   Meaning that the condition should be 
   ```group.endpointInformationEpoch() != memberEndpointEpochInRequest```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to