Copilot commented on code in PR #22464:
URL: https://github.com/apache/kafka/pull/22464#discussion_r3361787382
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -242,6 +254,10 @@ public StreamsGroup(
this.members = new TimelineHashMap<>(snapshotRegistry, 0);
this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
this.validatedTopologyEpoch = new TimelineInteger(snapshotRegistry);
+ this.storedTopologyEpoch = new TimelineInteger(snapshotRegistry);
+ this.storedTopologyEpoch.set(-1);
+ this.lastFailedTopologyEpoch = new TimelineInteger(snapshotRegistry);
+ this.lastFailedTopologyEpoch.set(-1);
Review Comment:
`validatedTopologyEpoch` is left at the `TimelineInteger` default (0), but
the StreamsGroup metadata schema and heartbeat logic use `-1` as the sentinel
for “no validated topology yet”. This can cause `validatedTopologyEpoch !=
group.validatedTopologyEpoch()` to evaluate true on a newly created in-memory
group (or any group created outside replay), triggering an unnecessary epoch
bump / rebalance decision. Initialize `validatedTopologyEpoch` to `-1` in the
constructor for consistency with the message default and the new topology-epoch
fields.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java:
##########
@@ -283,6 +283,39 @@ public void testNewStreamsGroupMetadataRecord() {
)));
}
+ @Test
+ public void
testNewStreamsGroupMetadataRecordWithTopologyDescriptionEpochs() {
+ // KIP-1331: the 7-arg overload persists storedTopologyEpoch and
lastFailedTopologyEpoch.
+ CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+ new StreamsGroupMetadataKey()
+ .setGroupId(GROUP_ID),
+ new ApiMessageAndVersion(
+ new StreamsGroupMetadataValue()
+ .setEpoch(42)
+ .setMetadataHash(43)
+ .setValidatedTopologyEpoch(44)
+ .setLastAssignmentConfigs(List.of())
+ .setStoredTopologyEpoch(7)
+ .setLastFailedTopologyEpoch(5),
+ (short) 0
+ )
+ );
+
+ assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+ GROUP_ID, 42, 43, 44, Map.of(), 7, 5));
+ }
+
+ @Test
+ public void
testNewStreamsGroupMetadataRecordFiveArgOverloadDefaultsToMinusOne() {
+ // The 5-arg back-compat overload must persist -1 for both new fields
so a routine call site
+ // that hasn't been retrofitted does not accidentally clear a
previously-stored epoch.
Review Comment:
The comment on the 5-arg `newStreamsGroupMetadataRecord` overload is
misleading: persisting `-1` for the new fields represents the default/unknown
state, and if such a record were written after a non-(-1) value it would
overwrite it (as your replay test later in `GroupMetadataManagerTest`
demonstrates). Updating this comment will prevent future readers from assuming
the overload is safe to use when they intend to preserve stored epochs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]