lucasbru commented on code in PR #22464:
URL: https://github.com/apache/kafka/pull/22464#discussion_r3356120307
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2191,7 +2207,7 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
response.setStatus(returnedStatus);
- return new CoordinatorResult<>(records, new
StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated));
+ return new CoordinatorResult<>(records, new
StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated,
group.currentTopologyEpoch()));
Review Comment:
`group.currentTopologyEpoch()` reads the group topology here, but we may
have updated the topology and appended a record for it to the group. I think
this should use `updatedTopology.topologyEpoch()` instead.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java:
##########
Review Comment:
Not sure if we should have this overload. It seems like we may easily make a
mistake here, call this overload and reset our topology epoch metadata. Are we
saving a lot of changes by introducing it?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1223,17 +1224,15 @@ public
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>>
groupsByTopicPartition.forEach((topicPartition, groupList) -> {
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> future
=
- runtime.scheduleReadOperation(
- "streams-group-describe",
- topicPartition,
- (coordinator, lastCommittedOffset) ->
coordinator.streamsGroupDescribe(groupList, lastCommittedOffset)
- ).exceptionally(exception -> handleOperationException(
- "streams-group-describe",
- groupList,
- exception,
- (error, __) ->
StreamsGroupDescribeRequest.getErrorDescribedGroupList(groupList, error),
- log
- ));
+ runtime.scheduleReadOperation("streams-group-describe",
Review Comment:
nit: somehow, this was reformatted
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupDescribeResult.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Bundles the per-group describe response data with the persisted topology
epoch the plugin has stored
+ * for each successfully described group (KIP-1331). The stored epoch lets the
service layer decide
+ * whether to call the topology description plugin's {@code getTopology} for a
group — the plugin is
+ * only consulted when {@code storedTopologyEpoch == currentTopologyEpoch}.
Review Comment:
Minor: `storedTopologyEpoch == currentTopologyEpoch` also holds when both
are -1 (brand-new group, nothing pushed by members and nothing stored), where
the plugin has nothing to serve. Should the guard also require
`storedTopologyEpoch != -1`?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java:
##########
@@ -283,6 +283,39 @@ public void testNewStreamsGroupMetadataRecord() {
)));
}
+ @Test
+ public void
testNewStreamsGroupMetadataRecordWithTopologyDescriptionEpochs() {
+ // KIP-1331: the 7-arg overload persists storedTopologyEpoch and
lastFailedTopologyEpoch.
+ CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+ new StreamsGroupMetadataKey()
+ .setGroupId(GROUP_ID),
+ new ApiMessageAndVersion(
+ new StreamsGroupMetadataValue()
+ .setEpoch(42)
+ .setMetadataHash(43)
+ .setValidatedTopologyEpoch(44)
+ .setLastAssignmentConfigs(List.of())
+ .setStoredTopologyEpoch(7)
+ .setLastFailedTopologyEpoch(5),
+ (short) 0
+ )
+ );
+
+ assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+ GROUP_ID, 42, 43, 44, Map.of(), 7, 5));
+ }
+
+ @Test
+ public void
testNewStreamsGroupMetadataRecordFiveArgOverloadDefaultsToMinusOne() {
+ // The 5-arg back-compat overload must persist -1 for both new fields
so a routine call site
+ // that hasn't been retrofitted does not accidentally clear a
previously-stored epoch.
Review Comment:
The reasoning reads backwards: writing -1 doesn't preserve a
previously-stored epoch, it clears it on replay (`setStoredTopologyEpoch(-1)`).
The actual invariant is that every production epoch-bump/fence path must use
the 7-arg overload — the 5-arg one persisting -1 is the unsafe fallback, not a
safe one. Worth rewording so the comment doesn't imply -1 is preserving.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -737,17 +738,21 @@ public
List<ShareGroupDescribeResponseData.DescribedGroup> shareGroupDescribe(
* @param groupIds The IDs of the groups to describe.
* @param committedOffset A specified committed offset corresponding to
this shard.
*
- * @return A list containing the
StreamsGroupDescribeResponseData.DescribedGroup.
- * If a group is not found, the DescribedGroup will contain the
error code and message.
+ * @return A {@link StreamsGroupDescribeResult} bundling the described
groups with per-group
+ * storedTopologyEpoch (KIP-1331). If a group is not found, its
DescribedGroup carries the
+ * error code and message and is omitted from the stored-epoch map.
*/
- public List<StreamsGroupDescribeResponseData.DescribedGroup>
streamsGroupDescribe(
+ public StreamsGroupDescribeResult streamsGroupDescribe(
List<String> groupIds,
long committedOffset
) {
final List<StreamsGroupDescribeResponseData.DescribedGroup>
describedGroups = new ArrayList<>();
+ final Map<String, Integer> groupIdToStoredTopologyEpochs = new
HashMap<>();
groupIds.forEach(groupId -> {
try {
- describedGroups.add(streamsGroup(groupId,
committedOffset).asDescribedGroup(committedOffset));
+ StreamsGroup group = streamsGroup(groupId, committedOffset);
+ describedGroups.add(group.asDescribedGroup(committedOffset));
+ groupIdToStoredTopologyEpochs.put(groupId,
group.storedTopologyEpoch());
Review Comment:
I think we want a consistent view of the group here, so we need to pass
`committedOffset` into `group.storedTopologyEpoch`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -900,16 +901,34 @@ public
List<ConsumerGroupDescribeResponseData.DescribedGroup> consumerGroupDescr
*
* @param groupIds The IDs of the groups to describe.
*
- * @return A list containing the
StreamsGroupDescribeResponseData.DescribedGroup.
+ * @return A {@link StreamsGroupDescribeResult} containing the described
groups and per-group
+ * stored topology epoch (KIP-1331).
*
*/
- public List<StreamsGroupDescribeResponseData.DescribedGroup>
streamsGroupDescribe(
+ public StreamsGroupDescribeResult streamsGroupDescribe(
List<String> groupIds,
long committedOffset
) {
return groupMetadataManager.streamsGroupDescribe(groupIds,
committedOffset);
}
+ /**
+ * Validates that a streams group exists and that the given member is a
current member of it
+ * (KIP-1331). Read-only; runs against the live state machine, so the
caller must schedule this
+ * on the coordinator runtime like any other read.
+ *
+ * @param groupId The group ID.
+ * @param memberId The member ID.
+ * @throws GroupIdNotFoundException if the group does not exist.
+ * @throws UnknownMemberIdException if the member is not in the group.
+ */
+ public void validateStreamsGroupMember(
+ String groupId,
+ String memberId
+ ) {
+ groupMetadataManager.validateStreamsGroupMember(groupId, memberId);
Review Comment:
It probably does not make a big difference in this case, but I think this
should carry a `committedOffset` to avoid reading potentially uncommitted
fencing / leaves.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]