Copilot commented on code in PR #21655:
URL: https://github.com/apache/kafka/pull/21655#discussion_r2895135889
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochsTest.java:
##########
@@ -152,9 +153,16 @@ public void
testFromCurrentAssignmentRecordWithMismatchedEpochs() {
.setPartitions(Arrays.asList(1, 2, 3))
.setAssignmentEpochs(Arrays.asList(10, 11))); // Only 2 epochs for
3 partitions
- assertThrows(IllegalStateException.class, () ->
- TasksTupleWithEpochs.fromCurrentAssignmentRecord(activeTasks,
List.of(), List.of(), 100)
- );
+ try (LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(TasksTupleWithEpochs.class)) {
+ TasksTupleWithEpochs tuple =
TasksTupleWithEpochs.fromCurrentAssignmentRecord(activeTasks, List.of(),
List.of(), 100);
+ assertEquals(
+ Map.of(SUBTOPOLOGY_1, Map.of(1, 100, 2, 100, 3, 100)),
+ tuple.activeTasksWithEpochs()
+ );
+ assertEquals(1, appender.getMessages("ERROR").stream()
+ .filter(msg -> msg.contains("Size of assignment epochs 2 is
not equal to partitions 3 for subtopology 1."))
+ .count());
+ }
Review Comment:
This test covers the mismatched-size case, but there’s still an important
legacy edge case: `assignmentEpochs` may be present as an empty list (vs
`null`). Given the parsing logic change, please add coverage that
`setAssignmentEpochs(List.of())` is treated as legacy (fallback to member
epoch) and does not emit an ERROR log (only true mismatches should).
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochs.java:
##########
@@ -137,19 +141,16 @@ private static Map<String, Map<Integer, Integer>>
parseActiveTasksWithEpochs(
Map<Integer, Integer> partitionsWithEpochs = new HashMap<>();
- if (epochs != null && !epochs.isEmpty()) {
- if (epochs.size() != partitions.size()) {
- throw new IllegalStateException(
- "Assignment epochs must be provided for all
partitions. " +
- "Subtopology " + subtopologyId + " has " +
partitions.size() +
- " partitions but " + epochs.size() + " epochs"
- );
- }
-
+ if (epochs != null && epochs.size() == partitions.size()) {
for (int i = 0; i < partitions.size(); i++) {
partitionsWithEpochs.put(partitions.get(i), epochs.get(i));
}
} else {
+ if (epochs != null) {
+ log.error("Size of assignment epochs {} is not equal to
partitions {} for subtopology {}. " +
+ "Using default epoch {} for all partitions.",
+ epochs.size(), partitions.size(), subtopologyId,
memberEpoch);
+ }
// Legacy record without epochs: use member epoch as default
Review Comment:
`assignmentEpochs` previously treated an empty list as the legacy “no
epochs” case (no exception/log, just fallback to `memberEpoch`). With the new
condition `epochs != null && epochs.size() == partitions.size()`, an empty
`epochs` list (size 0) with non-empty `partitions` will now log an ERROR and
fall back, which can spam logs if legacy records deserialize to an empty list
rather than `null`. Consider treating `epochs == null || epochs.isEmpty()` as
legacy (no error), and only logging when `epochs` is non-empty but its size
mismatches `partitions`.
```suggestion
if (epochs == null || epochs.isEmpty()) {
// Legacy record without epochs (or deserialized as empty):
use member epoch as default
for (Integer partition : partitions) {
partitionsWithEpochs.put(partition, memberEpoch);
}
} else if (epochs.size() == partitions.size()) {
for (int i = 0; i < partitions.size(); i++) {
partitionsWithEpochs.put(partitions.get(i),
epochs.get(i));
}
} else {
// Non-empty epochs list with size mismatch: log error and
fall back to member epoch
log.error("Size of assignment epochs {} is not equal to
partitions {} for subtopology {}. " +
"Using default epoch {} for all partitions.",
epochs.size(), partitions.size(), subtopologyId,
memberEpoch);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]