apoorvmittal10 commented on code in PR #16828:
URL: https://github.com/apache/kafka/pull/16828#discussion_r1710310089
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -677,6 +705,73 @@ private void maybeRemovePartitionEpoch(
}
}
+ /**
+ * Removes the partition epochs based on the provided assignment.
+ *
+ * @param assignment The assignment.
+ * @param expectedEpoch The expected epoch.
+ * @throws IllegalStateException if the epoch does not match the expected
one.
+ * package-private for testing.
+ */
+ void removePartitionEpochs(
+ Map<Uuid, Set<Integer>> assignment,
+ int expectedEpoch
+ ) {
+ assignment.forEach((topicId, assignedPartitions) -> {
+ currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
+ if (partitionsOrNull != null) {
+ assignedPartitions.forEach(partitionId -> {
+ Integer prevValue =
partitionsOrNull.remove(partitionId);
+ if (prevValue != expectedEpoch) {
+ throw new IllegalStateException(
+ String.format("Cannot remove the epoch %d from
%s-%s because the partition is " +
+ "still owned at a different epoch %d",
expectedEpoch, topicId, partitionId, prevValue));
+ }
+ });
+ if (partitionsOrNull.isEmpty()) {
+ return null;
+ } else {
+ return partitionsOrNull;
+ }
+ } else {
+ throw new IllegalStateException(
+ String.format("Cannot remove the epoch %d from %s
because it does not have any epoch",
+ expectedEpoch, topicId));
+ }
+ });
+ });
+ }
+
+ /**
+ * Adds the partitions epoch based on the provided assignment.
+ *
+ * @param assignment The assignment.
+ * @param epoch The new epoch.
+ * @throws IllegalStateException if the partition already has an epoch
assigned.
+ * package-private for testing.
+ */
+ void addPartitionEpochs(
+ Map<Uuid, Set<Integer>> assignment,
+ int epoch
+ ) {
+ assignment.forEach((topicId, assignedPartitions) -> {
+ currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
+ if (partitionsOrNull == null) {
+ partitionsOrNull = new TimelineHashMap<>(snapshotRegistry,
assignedPartitions.size());
+ }
+ for (Integer partitionId : assignedPartitions) {
+ Integer prevValue = partitionsOrNull.put(partitionId,
epoch);
Review Comment:
As per my understanding, `currentPartitionEpoch` is backed by
`TimelineHashMap` where soft state is updated here but if exception occurs then
the transaction is not persisted and gets overriden on next successful update.
This was an existing code in ConsumerGroup.java which has been put back in
current PR, we initially moved this code to ModernGroup so both Share and
Consumer groups can use but later realized that Share Groups do not need this
functionality.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]