AndrewJSchofield commented on code in PR #19861:
URL: https://github.com/apache/kafka/pull/19861#discussion_r2121064720


##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -644,8 +632,7 @@ public CoordinatorResult<Void, CoordinatorRecord> 
maybeCleanupShareState(Set<Uui
     /**
      * Util method to generate a ShareSnapshot or ShareUpdate type record for 
a key, based on various conditions.
      * <p>
-     * If no snapshot has been created for the key => create a new 
ShareSnapshot record
-     * else if number of ShareUpdate records for key >= max allowed per 
snapshot per key => create a new ShareSnapshot record
+     * Ff number of ShareUpdate records for key >= max allowed per snapshot 
per key or stateEpoch is highest seen => create a new ShareSnapshot record

Review Comment:
   nit: What's "Ff"?



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -658,28 +645,14 @@ private CoordinatorRecord generateShareStateRecord(
     ) {
         long timestamp = time.milliseconds();
         int updatesPerSnapshotLimit = 
config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot();
-        if (!shareStateMap.containsKey(key)) {
-            // Since this is the first time we are getting a write request for 
key, we should be creating a share snapshot record.
-            // The incoming partition data could have overlapping state 
batches, we must merge them
-            return ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
-                key.groupId(), key.topicId(), partitionData.partition(),
-                new ShareGroupOffset.Builder()
-                    .setSnapshotEpoch(0)
-                    .setStartOffset(partitionData.startOffset())
-                    .setLeaderEpoch(partitionData.leaderEpoch())
-                    .setStateEpoch(partitionData.stateEpoch())
-                    .setStateBatches(mergeBatches(List.of(), partitionData))
-                    .setCreateTimestamp(timestamp)
-                    .setWriteTimestamp(timestamp)
-                    .build());
-        } else if (snapshotUpdateCount.getOrDefault(key, 0) >= 
updatesPerSnapshotLimit || partitionData.stateEpoch() > 
shareStateMap.get(key).stateEpoch()) {
+        if (snapshotUpdateCount.getOrDefault(key, 0) >= 
updatesPerSnapshotLimit || partitionData.stateEpoch() > 
shareStateMap.get(key).stateEpoch()) {

Review Comment:
   I believe it's possible for `shareStateMap` not to contain the key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to