smjn commented on code in PR #19861:
URL: https://github.com/apache/kafka/pull/19861#discussion_r2122975078


##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -658,28 +645,14 @@ private CoordinatorRecord generateShareStateRecord(
     ) {
         long timestamp = time.milliseconds();
         int updatesPerSnapshotLimit = 
config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot();
-        if (!shareStateMap.containsKey(key)) {
-            // Since this is the first time we are getting a write request for 
key, we should be creating a share snapshot record.
-            // The incoming partition data could have overlapping state 
batches, we must merge them
-            return ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
-                key.groupId(), key.topicId(), partitionData.partition(),
-                new ShareGroupOffset.Builder()
-                    .setSnapshotEpoch(0)
-                    .setStartOffset(partitionData.startOffset())
-                    .setLeaderEpoch(partitionData.leaderEpoch())
-                    .setStateEpoch(partitionData.stateEpoch())
-                    .setStateBatches(mergeBatches(List.of(), partitionData))
-                    .setCreateTimestamp(timestamp)
-                    .setWriteTimestamp(timestamp)
-                    .build());
-        } else if (snapshotUpdateCount.getOrDefault(key, 0) >= 
updatesPerSnapshotLimit || partitionData.stateEpoch() > 
shareStateMap.get(key).stateEpoch()) {
+        if (snapshotUpdateCount.getOrDefault(key, 0) >= 
updatesPerSnapshotLimit || partitionData.stateEpoch() > 
shareStateMap.get(key).stateEpoch()) {

Review Comment:
   Hi @AndrewJSchofield ,
   Thanks for the review.
   No, it is not possible since this method is called from 
`ShareCoordinatorShard.writeState` and 
`ShareCoordinatorShard.readStateAndMaybeUpdateLeaderEpoch`. But before calling 
it - both methods do error checking (`maybeGetWriteStateError`, 
`maybeGetReadStateError`) where they explicitly check that the `shareStateMap` 
is not empty and return "read/write on uninitialized share partition" error.
   
   New tests `testWriteFailsOnUninitializedPartition` and 
`testReadFailsOnUninitializedPartition` add proof for the same.
   
   Will add additional javadoc to clarify.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to