lucasbru commented on code in PR #20760:
URL: https://github.com/apache/kafka/pull/20760#discussion_r2455516109
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -1120,4 +1130,54 @@ public void setLastAssignmentConfigs(Map<String, String>
lastAssignmentConfigs)
this.lastAssignmentConfigs.putAll(lastAssignmentConfigs);
}
}
+
+ /**
+ * Creates a validator that checks if the received member epoch is valid
for each partition's assignment epoch.
+ *
+ * @param member The member whose assignments are being validated.
+ * @param receivedMemberEpoch The received member epoch.
+ * @return A validator for per-partition validation.
+ */
+ private CommitPartitionValidator createAssignmentEpochValidator(
+ final StreamsGroupMember member,
+ int receivedMemberEpoch
+ ) {
+ // Retrieve topology once for all partitions - not per partition!
+ final StreamsTopology streamsTopology = topology.get().orElseThrow(()
->
+ new StaleMemberEpochException("Topology is not available for
offset commit validation."));
Review Comment:
We do not allow removing the topology, so I think this may almost
impossible. We'd have to recreate the group of the same name, and get the same
member ID back to reach this point. If that would ever happen, I think fencing
the member would be okay.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]