squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2565519871


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1087,8 +1100,10 @@ void addPartitionEpochs(
                     partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedPartitions.size());
                 }
                 for (Integer partitionId : assignedPartitions) {
-                    Integer prevValue = partitionsOrNull.put(partitionId, 
epoch);
-                    if (prevValue != null) {
+                    Integer prevValue = partitionsOrNull.get(partitionId);
+                    if (prevValue == null || prevValue <= epoch) {

Review Comment:
   Actually I was wrong. It's possible to see a replacement with the same epoch:
   
   1. Member A unsubscribes from `bar` at epoch N - 1. `bar` is pending 
revocation due to the fix for KAFKA-19431. `Member A { epoch: N - 1, assigned 
partitions: [foo], pending revocation: [bar] }`
   2. A new assignment is available with epoch N.
   3. Member A yields bar. The epoch is bumped. `Member A { epoch: N, assigned 
partitions: [], pending revocation: [foo] }`
   4. Member A yields foo. `Member A { epoch: N, assigned partitions: [], 
pending revocation: []}`
   5. Member B is assigned foo. `Member B { epoch: N, assigned partitions: 
[foo] }`
   
   When the record from 4 is dropped by compaction, foo's partition epoch is 
replaced with an identical epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to