Copilot commented on code in PR #22464:
URL: https://github.com/apache/kafka/pull/22464#discussion_r3361787382


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -242,6 +254,10 @@ public StreamsGroup(
         this.members = new TimelineHashMap<>(snapshotRegistry, 0);
         this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
         this.validatedTopologyEpoch = new TimelineInteger(snapshotRegistry);
+        this.storedTopologyEpoch = new TimelineInteger(snapshotRegistry);
+        this.storedTopologyEpoch.set(-1);
+        this.lastFailedTopologyEpoch = new TimelineInteger(snapshotRegistry);
+        this.lastFailedTopologyEpoch.set(-1);

Review Comment:
   `validatedTopologyEpoch` is left at the `TimelineInteger` default (0), but 
the StreamsGroup metadata schema and heartbeat logic use `-1` as the sentinel 
for “no validated topology yet”. This can cause `validatedTopologyEpoch != 
group.validatedTopologyEpoch()` to evaluate true on a newly created in-memory 
group (or any group created outside replay), triggering an unnecessary epoch 
bump / rebalance decision. Initialize `validatedTopologyEpoch` to `-1` in the 
constructor for consistency with the message default and the new topology-epoch 
fields.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java:
##########
@@ -283,6 +283,39 @@ public void testNewStreamsGroupMetadataRecord() {
         )));
     }
 
+    @Test
+    public void 
testNewStreamsGroupMetadataRecordWithTopologyDescriptionEpochs() {
+        // KIP-1331: the 7-arg overload persists storedTopologyEpoch and 
lastFailedTopologyEpoch.
+        CoordinatorRecord expectedRecord = CoordinatorRecord.record(
+            new StreamsGroupMetadataKey()
+                .setGroupId(GROUP_ID),
+            new ApiMessageAndVersion(
+                new StreamsGroupMetadataValue()
+                    .setEpoch(42)
+                    .setMetadataHash(43)
+                    .setValidatedTopologyEpoch(44)
+                    .setLastAssignmentConfigs(List.of())
+                    .setStoredTopologyEpoch(7)
+                    .setLastFailedTopologyEpoch(5),
+                (short) 0
+            )
+        );
+
+        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+            GROUP_ID, 42, 43, 44, Map.of(), 7, 5));
+    }
+
+    @Test
+    public void 
testNewStreamsGroupMetadataRecordFiveArgOverloadDefaultsToMinusOne() {
+        // The 5-arg back-compat overload must persist -1 for both new fields 
so a routine call site
+        // that hasn't been retrofitted does not accidentally clear a 
previously-stored epoch.

Review Comment:
   The comment on the 5-arg `newStreamsGroupMetadataRecord` overload is 
misleading: persisting `-1` for the new fields represents the default/unknown 
state, and if such a record were written after a non-(-1) value it would 
overwrite it (as your replay test later in `GroupMetadataManagerTest` 
demonstrates). Updating this comment will prevent future readers from assuming 
the overload is safe to use when they intend to preserve stored epochs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to