jeffkbkim commented on code in PR #15221:
URL: https://github.com/apache/kafka/pull/15221#discussion_r1457909168


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -685,6 +712,20 @@ private boolean hasPendingTransactionalOffsets(
         return false;
     }
 
+    /**
+     * @return true iff there is a committed offset in the main offset store 
for the
+     * given group, topic and partition.
+     *
+     * Package private for testing.
+     */
+    boolean hadCommittedOffset(

Review Comment:
   nit: should this be hasCommittedOffset?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -982,12 +1040,15 @@ public void replayEndTransactionMarker(
                         log.debug("Committed transaction offset commit for 
producer id {} in group {} " +
                             "with topic {}, partition {}, and offset {}.",
                             producerId, groupId, topicName, partitionId, 
offsetAndMetadata);
-                        offsets.put(
+                        OffsetAndMetadata previousValue = offsets.put(
                             groupId,
                             topicName,
                             partitionId,
                             offsetAndMetadata
                         );
+                        if (previousValue == null) {
+                            metrics.incrementNumOffsets();
+                        }

Review Comment:
   thanks for adding this!



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -604,22 +602,23 @@ public CoordinatorResult<OffsetDeleteResponseData, 
Record> deleteOffsets(
                     )
                 );
             } else {
-                final TimelineHashMap<Integer, OffsetAndMetadata> 
offsetsByPartition = offsetsByTopic == null ?
-                    null : offsetsByTopic.get(topic.name());
-                if (offsetsByPartition != null) {
-                    topic.partitions().forEach(partition -> {
-                        if 
(offsetsByPartition.containsKey(partition.partitionIndex())) {
-                            responsePartitionCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
-                                .setPartitionIndex(partition.partitionIndex())
-                            );
-                            
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
-                                request.groupId(),
-                                topic.name(),
-                                partition.partitionIndex()
-                            ));
-                        }
-                    });
-                }
+                topic.partitions().forEach(partition -> {
+                    // We always add the partition to the response.
+                    responsePartitionCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+                        .setPartitionIndex(partition.partitionIndex())
+                    );
+
+                    // A tombstone is written if an offset is present is the 
main storage or

Review Comment:
   nit: "is present in"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to