dajac commented on code in PR #17580:
URL: https://github.com/apache/kafka/pull/17580#discussion_r1873001493


##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -296,6 +296,102 @@ public 
CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> wr
         return new CoordinatorResult<>(Collections.singletonList(record), 
responseData);
     }
 
+    /**
+     * Method reads data from the soft state and if needed updates the leader 
epoch.
+     * It can happen that a read state call for a share partition has a higher 
leaderEpoch
+     * value than seen so far.
+     * In case an update is not required, empty record list will be generated 
along with a success response.
+     * @param request - represents ReadShareGroupStateRequestData
+     * @return CoordinatorResult object
+     */
+    public CoordinatorResult<ReadShareGroupStateResponseData, 
CoordinatorRecord> readStateAndMaybeUpdateLeaderEpoch(
+        ReadShareGroupStateRequestData request
+    ) {
+        // Only one key will be there in the request by design.
+        Optional<ReadShareGroupStateResponseData> error = 
maybeGetReadStateError(request);
+        if (error.isPresent()) {
+            return new CoordinatorResult<>(Collections.emptyList(), 
error.get());
+        }
+
+        ReadShareGroupStateRequestData.ReadStateData topicData = 
request.topics().get(0);
+        ReadShareGroupStateRequestData.PartitionData partitionData = 
topicData.partitions().get(0);
+
+        Uuid topicId = topicData.topicId();
+        int partitionId = partitionData.partition();
+        int leaderEpoch = partitionData.leaderEpoch();
+        SharePartitionKey key = 
SharePartitionKey.getInstance(request.groupId(), topicId, partitionId);
+
+        ReadShareGroupStateResponseData responseData = null;
+
+        if (!shareStateMap.containsKey(key)) {
+            // Leader epoch update might be needed
+            responseData = ReadShareGroupStateResponse.toResponseData(
+                topicId,
+                partitionId,
+                PartitionFactory.UNINITIALIZED_START_OFFSET,
+                PartitionFactory.DEFAULT_STATE_EPOCH,
+                Collections.emptyList()
+            );
+        } else if (shareStateMap.get(key) == null) {

Review Comment:
   I have a question regarding this branch. If I recall correctly, 
TimelineHashMap does not allow null keys/values. Hence, I am not sure if we 
really need it. I was looking into it because you get the key three times in a 
row here on the happy path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to