jolshan commented on code in PR #15183:
URL: https://github.com/apache/kafka/pull/15183#discussion_r1454285529


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -979,15 +981,30 @@ public void replayEndTransactionMarker(
             pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
                 topicOffsets.forEach((topicName, partitionOffsets) -> {
                     partitionOffsets.forEach((partitionId, offsetAndMetadata) 
-> {
-                        log.debug("Committed transaction offset commit for 
producer id {} in group {} " +
-                            "with topic {}, partition {}, and offset {}.",
-                            producerId, groupId, topicName, partitionId, 
offsetAndMetadata);
-                        offsets.put(
+                        OffsetAndMetadata existingOffsetAndMetadata = 
offsets.get(
                             groupId,
                             topicName,
-                            partitionId,
-                            offsetAndMetadata
+                            partitionId
                         );
+
+                        // We always keep the most recent committed offset 
when we have a mix of transactional and regular
+                        // offset commits. Without preserving information of 
the commit record offset, compaction of the
+                        // __consumer_offsets topic itself may result in the 
wrong offset commit being materialized.
+                        if (existingOffsetAndMetadata == null || 
offsetAndMetadata.recordOffset > existingOffsetAndMetadata.recordOffset) {
+                            log.debug("Committed transactional offset commit 
{} for producer id {} in group {} " +
+                                "with topic {} and partition {}.",
+                                offsetAndMetadata, producerId, groupId, 
topicName, partitionId);
+                            offsets.put(

Review Comment:
   if the offset is not the latest offset were we incorrectly saying the last 
transactional commit offset was the latest one?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -979,15 +981,30 @@ public void replayEndTransactionMarker(
             pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
                 topicOffsets.forEach((topicName, partitionOffsets) -> {
                     partitionOffsets.forEach((partitionId, offsetAndMetadata) 
-> {
-                        log.debug("Committed transaction offset commit for 
producer id {} in group {} " +
-                            "with topic {}, partition {}, and offset {}.",
-                            producerId, groupId, topicName, partitionId, 
offsetAndMetadata);
-                        offsets.put(
+                        OffsetAndMetadata existingOffsetAndMetadata = 
offsets.get(
                             groupId,
                             topicName,
-                            partitionId,
-                            offsetAndMetadata
+                            partitionId
                         );
+
+                        // We always keep the most recent committed offset 
when we have a mix of transactional and regular
+                        // offset commits. Without preserving information of 
the commit record offset, compaction of the
+                        // __consumer_offsets topic itself may result in the 
wrong offset commit being materialized.
+                        if (existingOffsetAndMetadata == null || 
offsetAndMetadata.recordOffset > existingOffsetAndMetadata.recordOffset) {
+                            log.debug("Committed transactional offset commit 
{} for producer id {} in group {} " +
+                                "with topic {} and partition {}.",
+                                offsetAndMetadata, producerId, groupId, 
topicName, partitionId);
+                            offsets.put(

Review Comment:
   if the offset is not the latest offset were we incorrectly saying the 
transactional commit offset was the latest one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to