AndrewJSchofield commented on code in PR #17580:
URL: https://github.com/apache/kafka/pull/17580#discussion_r1871201141


##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -296,6 +296,94 @@ public CoordinatorResult<WriteShareGroupStateResponseData, 
CoordinatorRecord> wr
         return new CoordinatorResult<>(Collections.singletonList(record), 
responseData);
     }
 
+    /**
+     * Method reads data from the soft state and if needed updates the leader 
epoch.
+     * It can happen that a read state call for a share partition has a higher 
leaderEpoch
+     * value than seen so far.
+     * In case an update is not required, empty record list will be generated 
along with a success response.
+     * @param request - represents ReadShareGroupStateRequestData
+     * @return CoordinatorResult object
+     */
+    public CoordinatorResult<ReadShareGroupStateResponseData, 
CoordinatorRecord> maybeUpdateLeaderEpochAndRead(
+        ReadShareGroupStateRequestData request
+    ) {
+        // only one key will be there in the request by design
+        Optional<ReadShareGroupStateResponseData> error = 
maybeGetReadStateError(request);
+        if (error.isPresent()) {
+            return new CoordinatorResult<>(Collections.emptyList(), 
error.get());
+        }
+
+        ReadShareGroupStateRequestData.ReadStateData topicData = 
request.topics().get(0);
+        ReadShareGroupStateRequestData.PartitionData partitionData = 
topicData.partitions().get(0);
+
+        Uuid topicId = topicData.topicId();
+        int partitionId = partitionData.partition();
+        int leaderEpoch = partitionData.leaderEpoch();
+        SharePartitionKey key = 
SharePartitionKey.getInstance(request.groupId(), topicId, partitionId);
+
+        ReadShareGroupStateResponseData responseData = null;
+
+        if (!shareStateMap.containsKey(key)) {
+            responseData = ReadShareGroupStateResponse.toResponseData(
+                topicId,
+                partitionId,
+                PartitionFactory.UNINITIALIZED_START_OFFSET,
+                PartitionFactory.DEFAULT_STATE_EPOCH,
+                Collections.emptyList()
+            );
+        } else if (shareStateMap.get(key) == null) {
+            // no state information was not found but key present

Review Comment:
   nit: double negative. I think you mean that no share state information was 
found but key present



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -296,6 +296,94 @@ public CoordinatorResult<WriteShareGroupStateResponseData, 
CoordinatorRecord> wr
         return new CoordinatorResult<>(Collections.singletonList(record), 
responseData);
     }
 
+    /**
+     * Method reads data from the soft state and if needed updates the leader 
epoch.
+     * It can happen that a read state call for a share partition has a higher 
leaderEpoch
+     * value than seen so far.
+     * In case an update is not required, empty record list will be generated 
along with a success response.
+     * @param request - represents ReadShareGroupStateRequestData
+     * @return CoordinatorResult object
+     */
+    public CoordinatorResult<ReadShareGroupStateResponseData, 
CoordinatorRecord> maybeUpdateLeaderEpochAndRead(

Review Comment:
   Personally, I think this is `readStateAndMaybeUpdateLeaderEpoch`. It's the 
read method, that is also able to update the leader epoch if necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to