lucasbru commented on code in PR #22464:
URL: https://github.com/apache/kafka/pull/22464#discussion_r3371552600
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -737,17 +738,23 @@ public
List<ShareGroupDescribeResponseData.DescribedGroup> shareGroupDescribe(
* @param groupIds The IDs of the groups to describe.
* @param committedOffset A specified committed offset corresponding to
this shard.
*
- * @return A list containing the
StreamsGroupDescribeResponseData.DescribedGroup.
- * If a group is not found, the DescribedGroup will contain the
error code and message.
+ * @return A {@link StreamsGroupDescribeResult} bundling the described
groups with per-group
+ * storedTopologyEpoch (KIP-1331). If a group is not found, its
DescribedGroup carries the
+ * error code and message and is omitted from the stored-epoch map.
*/
- public List<StreamsGroupDescribeResponseData.DescribedGroup>
streamsGroupDescribe(
+ public StreamsGroupDescribeResult streamsGroupDescribe(
List<String> groupIds,
long committedOffset
) {
final List<StreamsGroupDescribeResponseData.DescribedGroup>
describedGroups = new ArrayList<>();
+ final Map<String, Integer> groupIdToStoredTopologyEpochs = new
HashMap<>();
groupIds.forEach(groupId -> {
try {
- describedGroups.add(streamsGroup(groupId,
committedOffset).asDescribedGroup(committedOffset));
+ StreamsGroup group = streamsGroup(groupId, committedOffset);
+ describedGroups.add(group.asDescribedGroup(committedOffset));
+ // KIP-1331: read the stored epoch at the same committedOffset
so describedGroup and the
Review Comment:
Remove the inline comment
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -646,6 +688,56 @@ public void setValidatedTopologyEpoch(int
validatedTopologyEpoch) {
maybeUpdateGroupState();
}
+ /**
+ * @return The topology epoch most recently accepted by the topology
description plugin, or -1 if none.
+ */
+ public int storedTopologyEpoch() {
+ return storedTopologyEpoch.get();
+ }
+
+ /**
+ * @param committedOffset A committed offset corresponding to the desired
snapshot.
+ * @return The topology epoch most recently accepted by the topology
description plugin at the given
Review Comment:
"accepted" is probably not the perfect word here.
I suppose "successfully stored" would describe it better.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2191,7 +2209,10 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
response.setStatus(returnedStatus);
- return new CoordinatorResult<>(records, new
StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated));
+ // KIP-1331: use updatedTopology.topologyEpoch() rather than
group.currentTopologyEpoch(). On the first
Review Comment:
Remove this inline comment. I don't need to keep a history of every
discussion thread in inline comments
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java:
##########
@@ -280,7 +280,29 @@ public void testNewStreamsGroupMetadataRecord() {
assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(GROUP_ID, 42, 43,
44, Map.of(
"num.standby.replicas", "2"
- )));
+ ), -1, -1));
+ }
+
+ @Test
+ public void
testNewStreamsGroupMetadataRecordWithTopologyDescriptionEpochs() {
+ // KIP-1331: the 7-arg overload persists storedTopologyEpoch and
lastFailedTopologyEpoch.
Review Comment:
Remove inline comment
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -8015,6 +8048,29 @@ private CoordinatorResult<HeartbeatResponseData,
CoordinatorRecord> classicGroup
);
}
+ /**
+ * Validates that a streams group exists and that the given member is a
current member of it.
+ * Used by the StreamsGroupTopologyDescriptionUpdate RPC handler
(KIP-1331) to enforce the
Review Comment:
THis seems like a generic valdiation function for me, we don't need to
mention KIP-1331 here.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -900,16 +901,37 @@ public
List<ConsumerGroupDescribeResponseData.DescribedGroup> consumerGroupDescr
*
* @param groupIds The IDs of the groups to describe.
*
- * @return A list containing the
StreamsGroupDescribeResponseData.DescribedGroup.
+ * @return A {@link StreamsGroupDescribeResult} containing the described
groups and per-group
+ * stored topology epoch (KIP-1331).
*
*/
- public List<StreamsGroupDescribeResponseData.DescribedGroup>
streamsGroupDescribe(
+ public StreamsGroupDescribeResult streamsGroupDescribe(
List<String> groupIds,
long committedOffset
) {
return groupMetadataManager.streamsGroupDescribe(groupIds,
committedOffset);
}
+ /**
+ * Validates that a streams group exists and that the given member is a
current member of it
Review Comment:
Seems like a generic method, we don't need to mention KIP-1331 here.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4504,7 +4525,17 @@ private <T> CoordinatorResult<T, CoordinatorRecord>
streamsGroupFenceMember(
// We bump the group epoch.
int groupEpoch = group.groupEpoch() + 1;
- records.add(newStreamsGroupMetadataRecord(group.groupId(), groupEpoch,
group.metadataHash(), group.validatedTopologyEpoch(),
group.lastAssignmentConfigs()));
+ // KIP-1331: preserve storedTopologyEpoch/lastFailedTopologyEpoch
across a fence. Fencing a single member
Review Comment:
remove inline comment
##########
group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json:
##########
@@ -27,7 +27,11 @@
{ "name": "ValidatedTopologyEpoch", "versions": "0+", "taggedVersions":
"0+", "tag": 0, "default": -1, "type": "int32",
"about": "The topology epoch whose topics are validated to be present in
a valid configuration in the metadata." },
{ "name": "LastAssignmentConfigs","taggedVersions": "0+",
"nullableVersions": "0+","tag": 1, "default": null,
- "type": "[]LastAssignmentConfig", "about": "The last used configuration
parameters as key-value pairs." }
+ "type": "[]LastAssignmentConfig", "about": "The last used configuration
parameters as key-value pairs." },
+ { "name": "StoredTopologyEpoch", "versions": "0+", "taggedVersions": "0+",
"tag": 2, "default": -1, "type": "int32",
Review Comment:
Sorry for raising this late: the names `StoredTopologyEpoch` /
`LastFailedTopologyEpoch` read ambiguously alongside the existing
`topologyEpoch` (the group's current topology epoch). The description doesn't
have its own epoch — it inherits the topology's — so the field really tracks
*which* topology epoch we have a stored/failed description for. Suggest
renaming to `StoredDescriptionTopologyEpoch` / `FailedDescriptionTopologyEpoch`
(parallel to the existing `validatedTopologyEpoch`); same applies to the Java
fields/accessors/setters and the `storedTopologyEpochs` map in
`StreamsGroupDescribeResult`. Should be a straight search/replace.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupHeartbeatResult.java:
##########
@@ -26,14 +26,28 @@
/**
* A simple record to hold the result of a StreamsGroupHeartbeat request.
*
- * @param data The data to be returned to the client.
- * @param creatableTopics The internal topics to be created.
+ * @param data The data to be returned to the client.
+ * @param creatableTopics The internal topics to be created.
+ * @param currentTopologyEpoch The topology epoch the group is operating at
after this heartbeat, or -1 if the
+ * group has no topology yet. The service layer
uses this to decide whether to set
+ * TopologyDescriptionRequired on the response
(KIP-1331).
*/
-public record StreamsGroupHeartbeatResult(StreamsGroupHeartbeatResponseData
data, Map<String, CreatableTopic> creatableTopics) {
+public record StreamsGroupHeartbeatResult(
+ StreamsGroupHeartbeatResponseData data,
+ Map<String, CreatableTopic> creatableTopics,
+ int currentTopologyEpoch
+) {
public StreamsGroupHeartbeatResult {
Objects.requireNonNull(data);
creatableTopics =
Collections.unmodifiableMap(Objects.requireNonNull(creatableTopics));
}
+ public StreamsGroupHeartbeatResult(
Review Comment:
`StreamsGroupHeartbeatResult` is internal coordinator plumbing, not public
API — there's no back-compat to preserve. The 2-arg overload has exactly one
remaining caller (`GroupCoordinatorServiceTest.java:516`), and
`testTwoArgConstructorDefaultsTopologyEpochToMinusOne` only exists to test the
overload itself. Updating that one test line to pass `-1` explicitly lets us
drop both the overload and the test.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupDescribeResult.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Bundles the per-group describe response data with the persisted topology
epoch the plugin has stored
Review Comment:
I think for this comment, I'd keep the main javadoc description indepedent
of KIP-1331 -- And put a shorter description of the purpose of
storedTopologyEpochs into the parameter description of storedTopologyEpochs,
where it makes sense to mention KIP-1331.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2084,7 +2091,18 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
int groupEpoch = group.groupEpoch();
if (bumpGroupEpoch) {
groupEpoch += 1;
- records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch,
metadataHash, validatedTopologyEpoch, currentAssignmentConfigs));
+ // KIP-1331: persist storedTopologyEpoch/lastFailedTopologyEpoch
unchanged on a routine epoch bump.
Review Comment:
Remove this inline comment. I don't need to keep a history of every
discussion thread in inline comments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]