lucasbru commented on code in PR #22464:
URL: https://github.com/apache/kafka/pull/22464#discussion_r3371552600


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -737,17 +738,23 @@ public 
List<ShareGroupDescribeResponseData.DescribedGroup> shareGroupDescribe(
      * @param groupIds          The IDs of the groups to describe.
      * @param committedOffset   A specified committed offset corresponding to 
this shard.
      *
-     * @return A list containing the 
StreamsGroupDescribeResponseData.DescribedGroup.
-     *         If a group is not found, the DescribedGroup will contain the 
error code and message.
+     * @return A {@link StreamsGroupDescribeResult} bundling the described 
groups with per-group
+     *         storedTopologyEpoch (KIP-1331). If a group is not found, its 
DescribedGroup carries the
+     *         error code and message and is omitted from the stored-epoch map.
      */
-    public List<StreamsGroupDescribeResponseData.DescribedGroup> 
streamsGroupDescribe(
+    public StreamsGroupDescribeResult streamsGroupDescribe(
         List<String> groupIds,
         long committedOffset
     ) {
         final List<StreamsGroupDescribeResponseData.DescribedGroup> 
describedGroups = new ArrayList<>();
+        final Map<String, Integer> groupIdToStoredTopologyEpochs = new 
HashMap<>();
         groupIds.forEach(groupId -> {
             try {
-                describedGroups.add(streamsGroup(groupId, 
committedOffset).asDescribedGroup(committedOffset));
+                StreamsGroup group = streamsGroup(groupId, committedOffset);
+                describedGroups.add(group.asDescribedGroup(committedOffset));
+                // KIP-1331: read the stored epoch at the same committedOffset 
so describedGroup and the

Review Comment:
   Remove the inline comment



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -646,6 +688,56 @@ public void setValidatedTopologyEpoch(int 
validatedTopologyEpoch) {
         maybeUpdateGroupState();
     }
 
+    /**
+     * @return The topology epoch most recently accepted by the topology 
description plugin, or -1 if none.
+     */
+    public int storedTopologyEpoch() {
+        return storedTopologyEpoch.get();
+    }
+
+    /**
+     * @param committedOffset A committed offset corresponding to the desired 
snapshot.
+     * @return The topology epoch most recently accepted by the topology 
description plugin at the given

Review Comment:
   "accepted" is probably not the perfect word here.
   I suppose "successfully stored" would describe it better.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2191,7 +2209,10 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
 
         response.setStatus(returnedStatus);
 
-        return new CoordinatorResult<>(records, new 
StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated));
+        // KIP-1331: use updatedTopology.topologyEpoch() rather than 
group.currentTopologyEpoch(). On the first

Review Comment:
   Remove this inline comment. I don't need to keep a history of every 
discussion thread in inline comments



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java:
##########
@@ -280,7 +280,29 @@ public void testNewStreamsGroupMetadataRecord() {
 
         assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(GROUP_ID, 42, 43, 
44, Map.of(
             "num.standby.replicas", "2"
-        )));
+        ), -1, -1));
+    }
+
+    @Test
+    public void 
testNewStreamsGroupMetadataRecordWithTopologyDescriptionEpochs() {
+        // KIP-1331: the 7-arg overload persists storedTopologyEpoch and 
lastFailedTopologyEpoch.

Review Comment:
   Remove inline comment



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -8015,6 +8048,29 @@ private CoordinatorResult<HeartbeatResponseData, 
CoordinatorRecord> classicGroup
         );
     }
 
+    /**
+     * Validates that a streams group exists and that the given member is a 
current member of it.
+     * Used by the StreamsGroupTopologyDescriptionUpdate RPC handler 
(KIP-1331) to enforce the

Review Comment:
   THis seems like a generic valdiation function for me, we don't need to 
mention KIP-1331 here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -900,16 +901,37 @@ public 
List<ConsumerGroupDescribeResponseData.DescribedGroup> consumerGroupDescr
      *
      * @param groupIds      The IDs of the groups to describe.
      *
-     * @return A list containing the 
StreamsGroupDescribeResponseData.DescribedGroup.
+     * @return A {@link StreamsGroupDescribeResult} containing the described 
groups and per-group
+     *         stored topology epoch (KIP-1331).
      *
      */
-    public List<StreamsGroupDescribeResponseData.DescribedGroup> 
streamsGroupDescribe(
+    public StreamsGroupDescribeResult streamsGroupDescribe(
         List<String> groupIds,
         long committedOffset
     ) {
         return groupMetadataManager.streamsGroupDescribe(groupIds, 
committedOffset);
     }
 
+    /**
+     * Validates that a streams group exists and that the given member is a 
current member of it

Review Comment:
   Seems like a generic method, we don't need to mention KIP-1331 here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4504,7 +4525,17 @@ private <T> CoordinatorResult<T, CoordinatorRecord> 
streamsGroupFenceMember(
 
         // We bump the group epoch.
         int groupEpoch = group.groupEpoch() + 1;
-        records.add(newStreamsGroupMetadataRecord(group.groupId(), groupEpoch, 
group.metadataHash(), group.validatedTopologyEpoch(), 
group.lastAssignmentConfigs()));
+        // KIP-1331: preserve storedTopologyEpoch/lastFailedTopologyEpoch 
across a fence. Fencing a single member

Review Comment:
   remove inline comment



##########
group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json:
##########
@@ -27,7 +27,11 @@
     { "name": "ValidatedTopologyEpoch", "versions": "0+", "taggedVersions": 
"0+", "tag": 0, "default": -1, "type": "int32",
       "about": "The topology epoch whose topics are validated to be present in 
a valid configuration in the metadata." },
     { "name": "LastAssignmentConfigs","taggedVersions": "0+", 
"nullableVersions": "0+","tag": 1, "default": null,
-      "type": "[]LastAssignmentConfig", "about": "The last used configuration 
parameters as key-value pairs." }
+      "type": "[]LastAssignmentConfig", "about": "The last used configuration 
parameters as key-value pairs." },
+    { "name": "StoredTopologyEpoch", "versions": "0+", "taggedVersions": "0+", 
"tag": 2, "default": -1, "type": "int32",

Review Comment:
   Sorry for raising this late: the names `StoredTopologyEpoch` / 
`LastFailedTopologyEpoch` read ambiguously alongside the existing 
`topologyEpoch` (the group's current topology epoch). The description doesn't 
have its own epoch — it inherits the topology's — so the field really tracks 
*which* topology epoch we have a stored/failed description for. Suggest 
renaming to `StoredDescriptionTopologyEpoch` / `FailedDescriptionTopologyEpoch` 
(parallel to the existing `validatedTopologyEpoch`); same applies to the Java 
fields/accessors/setters and the `storedTopologyEpochs` map in 
`StreamsGroupDescribeResult`. Should be a straight search/replace.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupHeartbeatResult.java:
##########
@@ -26,14 +26,28 @@
 /**
  * A simple record to hold the result of a StreamsGroupHeartbeat request.
  *
- * @param data            The data to be returned to the client.
- * @param creatableTopics The internal topics to be created.
+ * @param data                  The data to be returned to the client.
+ * @param creatableTopics       The internal topics to be created.
+ * @param currentTopologyEpoch  The topology epoch the group is operating at 
after this heartbeat, or -1 if the
+ *                              group has no topology yet. The service layer 
uses this to decide whether to set
+ *                              TopologyDescriptionRequired on the response 
(KIP-1331).
  */
-public record StreamsGroupHeartbeatResult(StreamsGroupHeartbeatResponseData 
data, Map<String, CreatableTopic> creatableTopics) {
+public record StreamsGroupHeartbeatResult(
+    StreamsGroupHeartbeatResponseData data,
+    Map<String, CreatableTopic> creatableTopics,
+    int currentTopologyEpoch
+) {
 
     public StreamsGroupHeartbeatResult {
         Objects.requireNonNull(data);
         creatableTopics = 
Collections.unmodifiableMap(Objects.requireNonNull(creatableTopics));
     }
 
+    public StreamsGroupHeartbeatResult(

Review Comment:
   `StreamsGroupHeartbeatResult` is internal coordinator plumbing, not public 
API — there's no back-compat to preserve. The 2-arg overload has exactly one 
remaining caller (`GroupCoordinatorServiceTest.java:516`), and 
`testTwoArgConstructorDefaultsTopologyEpochToMinusOne` only exists to test the 
overload itself. Updating that one test line to pass `-1` explicitly lets us 
drop both the overload and the test.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupDescribeResult.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Bundles the per-group describe response data with the persisted topology 
epoch the plugin has stored

Review Comment:
   I think for this comment, I'd keep the main javadoc description indepedent 
of KIP-1331 -- And put a shorter description of the purpose of 
storedTopologyEpochs into the parameter description of storedTopologyEpochs, 
where it makes sense to mention KIP-1331.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2084,7 +2091,18 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
         int groupEpoch = group.groupEpoch();
         if (bumpGroupEpoch) {
             groupEpoch += 1;
-            records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch, 
metadataHash, validatedTopologyEpoch, currentAssignmentConfigs));
+            // KIP-1331: persist storedTopologyEpoch/lastFailedTopologyEpoch 
unchanged on a routine epoch bump.

Review Comment:
   Remove this inline comment. I don't need to keep a history of every 
discussion thread in inline comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to