lucasbru commented on code in PR #22464:
URL: https://github.com/apache/kafka/pull/22464#discussion_r3356120307


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2191,7 +2207,7 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
 
         response.setStatus(returnedStatus);
 
-        return new CoordinatorResult<>(records, new 
StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated));
+        return new CoordinatorResult<>(records, new 
StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated, 
group.currentTopologyEpoch()));

Review Comment:
   `group.currentTopologyEpoch()` reads the group topology here, but we may 
have updated the topology and appended a record for it to the group. I think 
this should use `updatedTopology.topologyEpoch()` instead.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java:
##########


Review Comment:
   Not sure if we should have this overload. It seems like we may easily make a 
mistake here, call this overload and reset our topology epoch metadata. Are we 
saving a lot of changes by introducing it?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1223,17 +1224,15 @@ public 
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>>
 
         groupsByTopicPartition.forEach((topicPartition, groupList) -> {
             
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> future 
=
-                runtime.scheduleReadOperation(
-                    "streams-group-describe",
-                    topicPartition,
-                    (coordinator, lastCommittedOffset) -> 
coordinator.streamsGroupDescribe(groupList, lastCommittedOffset)
-                ).exceptionally(exception -> handleOperationException(
-                    "streams-group-describe",
-                    groupList,
-                    exception,
-                    (error, __) -> 
StreamsGroupDescribeRequest.getErrorDescribedGroupList(groupList, error),
-                    log
-                ));
+                runtime.scheduleReadOperation("streams-group-describe",

Review Comment:
   nit: somehow, this was reformatted



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupDescribeResult.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Bundles the per-group describe response data with the persisted topology 
epoch the plugin has stored
+ * for each successfully described group (KIP-1331). The stored epoch lets the 
service layer decide
+ * whether to call the topology description plugin's {@code getTopology} for a 
group — the plugin is
+ * only consulted when {@code storedTopologyEpoch == currentTopologyEpoch}.

Review Comment:
   Minor: `storedTopologyEpoch == currentTopologyEpoch` also holds when both 
are -1 (brand-new group, nothing pushed by members and nothing stored), where 
the plugin has nothing to serve. Should the guard also require 
`storedTopologyEpoch != -1`?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java:
##########
@@ -283,6 +283,39 @@ public void testNewStreamsGroupMetadataRecord() {
         )));
     }
 
+    @Test
+    public void 
testNewStreamsGroupMetadataRecordWithTopologyDescriptionEpochs() {
+        // KIP-1331: the 7-arg overload persists storedTopologyEpoch and 
lastFailedTopologyEpoch.
+        CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+            new StreamsGroupMetadataKey()
+                .setGroupId(GROUP_ID),
+            new ApiMessageAndVersion(
+                new StreamsGroupMetadataValue()
+                    .setEpoch(42)
+                    .setMetadataHash(43)
+                    .setValidatedTopologyEpoch(44)
+                    .setLastAssignmentConfigs(List.of())
+                    .setStoredTopologyEpoch(7)
+                    .setLastFailedTopologyEpoch(5),
+                (short) 0
+            )
+        );
+
+        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+            GROUP_ID, 42, 43, 44, Map.of(), 7, 5));
+    }
+
+    @Test
+    public void 
testNewStreamsGroupMetadataRecordFiveArgOverloadDefaultsToMinusOne() {
+        // The 5-arg back-compat overload must persist -1 for both new fields 
so a routine call site
+        // that hasn't been retrofitted does not accidentally clear a 
previously-stored epoch.

Review Comment:
   The reasoning reads backwards: writing -1 doesn't preserve a 
previously-stored epoch, it clears it on replay (`setStoredTopologyEpoch(-1)`). 
The actual invariant is that every production epoch-bump/fence path must use 
the 7-arg overload — the 5-arg one persisting -1 is the unsafe fallback, not a 
safe one. Worth rewording so the comment doesn't imply -1 is preserving.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -737,17 +738,21 @@ public 
List<ShareGroupDescribeResponseData.DescribedGroup> shareGroupDescribe(
      * @param groupIds          The IDs of the groups to describe.
      * @param committedOffset   A specified committed offset corresponding to 
this shard.
      *
-     * @return A list containing the 
StreamsGroupDescribeResponseData.DescribedGroup.
-     *         If a group is not found, the DescribedGroup will contain the 
error code and message.
+     * @return A {@link StreamsGroupDescribeResult} bundling the described 
groups with per-group
+     *         storedTopologyEpoch (KIP-1331). If a group is not found, its 
DescribedGroup carries the
+     *         error code and message and is omitted from the stored-epoch map.
      */
-    public List<StreamsGroupDescribeResponseData.DescribedGroup> 
streamsGroupDescribe(
+    public StreamsGroupDescribeResult streamsGroupDescribe(
         List<String> groupIds,
         long committedOffset
     ) {
         final List<StreamsGroupDescribeResponseData.DescribedGroup> 
describedGroups = new ArrayList<>();
+        final Map<String, Integer> groupIdToStoredTopologyEpochs = new 
HashMap<>();
         groupIds.forEach(groupId -> {
             try {
-                describedGroups.add(streamsGroup(groupId, 
committedOffset).asDescribedGroup(committedOffset));
+                StreamsGroup group = streamsGroup(groupId, committedOffset);
+                describedGroups.add(group.asDescribedGroup(committedOffset));
+                groupIdToStoredTopologyEpochs.put(groupId, 
group.storedTopologyEpoch());

Review Comment:
   I think we want a consistent view of the group here, so we need to pass 
`committedOffset` into `group.storedTopologyEpoch`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -900,16 +901,34 @@ public 
List<ConsumerGroupDescribeResponseData.DescribedGroup> consumerGroupDescr
      *
      * @param groupIds      The IDs of the groups to describe.
      *
-     * @return A list containing the 
StreamsGroupDescribeResponseData.DescribedGroup.
+     * @return A {@link StreamsGroupDescribeResult} containing the described 
groups and per-group
+     *         stored topology epoch (KIP-1331).
      *
      */
-    public List<StreamsGroupDescribeResponseData.DescribedGroup> 
streamsGroupDescribe(
+    public StreamsGroupDescribeResult streamsGroupDescribe(
         List<String> groupIds,
         long committedOffset
     ) {
         return groupMetadataManager.streamsGroupDescribe(groupIds, 
committedOffset);
     }
 
+    /**
+     * Validates that a streams group exists and that the given member is a 
current member of it
+     * (KIP-1331). Read-only; runs against the live state machine, so the 
caller must schedule this
+     * on the coordinator runtime like any other read.
+     *
+     * @param groupId   The group ID.
+     * @param memberId  The member ID.
+     * @throws GroupIdNotFoundException if the group does not exist.
+     * @throws UnknownMemberIdException if the member is not in the group.
+     */
+    public void validateStreamsGroupMember(
+        String groupId,
+        String memberId
+    ) {
+        groupMetadataManager.validateStreamsGroupMember(groupId, memberId);

Review Comment:
   It probably does not make a big difference in this case, but I think this 
should carry a `committedOffset` to avoid reading potentially uncommitted 
fencing / leaves.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to