mjsax commented on code in PR #19691:
URL: https://github.com/apache/kafka/pull/19691#discussion_r2085659458


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java:
##########
@@ -355,6 +357,9 @@ public Map<String, Subtopology> subtopologies() {
         return subtopologies;
     }
 
+    public int endpointInformationEpoch() {
+        return endpointInformationEpoch;
+    }
     public int topologyEpoch() {

Review Comment:
   nit: missing empty line



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java:
##########
@@ -329,6 +329,8 @@ public String toString() {
 
     private final 
AtomicReference<List<StreamsGroupHeartbeatResponseData.Status>> statuses = new 
AtomicReference<>(List.of());
 
+    private int endpointInformationEpoch = 0;

Review Comment:
   Should we init this with `-1` ?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1983,8 +1984,16 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
         StreamsGroupHeartbeatResponseData response = new 
StreamsGroupHeartbeatResponseData()
             .setMemberId(updatedMember.memberId())
             .setMemberEpoch(updatedMember.memberEpoch())
-            .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId))
-            
.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group));
+            .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId));
+
+        if (group.endpointInformationEpoch() > memberEndpointEpochInRequest) {
+            
response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group));
+            
response.setEndpointInformationEpoch(group.endpointInformationEpoch());
+        } else {
+            int responseEndpoint = Math.min(group.endpointInformationEpoch(), 
memberEndpointEpochInRequest);

Review Comment:
   We do not persist the endpoint-epoch, right? That is the case you are 
talking about?
   
   But I thought for this case `group.endpointInformationEpoch()` would be `-1` 
[or whatever value we initialize it] (ie unknown?) -- So 
`memberEndpointEpochInRequest` should always be larger?
   
   Correct me if I am wrong. The end-to-end control flow, is not totally clear 
to me atm.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1983,8 +1984,16 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
         StreamsGroupHeartbeatResponseData response = new 
StreamsGroupHeartbeatResponseData()
             .setMemberId(updatedMember.memberId())
             .setMemberEpoch(updatedMember.memberEpoch())
-            .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId))
-            
.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group));
+            .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId));
+
+        if (group.endpointInformationEpoch() > memberEndpointEpochInRequest) {
+            
response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group));
+            
response.setEndpointInformationEpoch(group.endpointInformationEpoch());
+        } else {
+            int responseEndpoint = Math.min(group.endpointInformationEpoch(), 
memberEndpointEpochInRequest);

Review Comment:
   ```suggestion
               int responseEndpointInformationEpoch = 
Math.min(group.endpointInformationEpoch(), memberEndpointEpochInRequest);
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1842,7 +1842,8 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
         String processId,
         Endpoint userEndpoint,
         List<KeyValue> clientTags,
-        boolean shutdownApplication
+        boolean shutdownApplication,
+        int memberEndpointEpochInRequest

Review Comment:
   All other variables also don't use `InRequest` suffix



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -205,6 +205,12 @@ public static class DeadlineAndEpoch {
      */
     private Optional<String> shutdownRequestMemberId = Optional.empty();
 
+    /**
+     * The current epoch for endpoint information, this is used to determine 
when to send
+     * updated endpoint information to members of the group.
+     */
+    private int endpointInformationEpoch;

Review Comment:
   Should we init this to `-1`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1993,6 +2002,7 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
             
response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().activeTasks()));
             
response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().standbyTasks()));
             
response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().warmupTasks()));
+            group.setEndpointInformationEpoch(group.endpointInformationEpoch() 
+ 1);

Review Comment:
   Do we need to change the reponse, too, using the bumped epoch, as also 
ensure the endpoint information is set?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1842,7 +1842,8 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
         String processId,
         Endpoint userEndpoint,
         List<KeyValue> clientTags,
-        boolean shutdownApplication
+        boolean shutdownApplication,
+        int memberEndpointEpochInRequest

Review Comment:
   ```suggestion
           int memberEndpointEpoch
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to