junrao commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1831742440


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +149,42 @@ public void onComplete() {
      */
     @Override
     public boolean tryComplete() {
-        topicPartitionDataFromTryComplete = acquirablePartitions();
-
-        if (!topicPartitionDataFromTryComplete.isEmpty()) {
-            boolean completedByMe = forceComplete();
-            // If invocation of forceComplete is not successful, then that 
means the request is already completed
-            // hence release the acquired locks.
-            if (!completedByMe) {
-                releasePartitionLocks(shareFetchData.groupId(), 
topicPartitionDataFromTryComplete.keySet());
+        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
acquirablePartitions();
+
+        try {
+            if (!topicPartitionData.isEmpty()) {
+                // In case, fetch offset metadata doesn't exist for one or 
more topic partitions, we do a
+                // replicaManager.readFromLog to populate the offset metadata 
and update the fetch offset metadata for
+                // those topic partitions.
+                Map<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
+                maybeUpdateFetchOffsetMetadata(replicaManagerReadResponse);
+                if (anyPartitionHasLogReadError(replicaManagerReadResponse) || 
isMinBytesSatisfied(topicPartitionData)) {
+                    partitionsAcquired = topicPartitionData;
+                    partitionsAlreadyFetched = replicaManagerReadResponse;
+                    boolean completedByMe = forceComplete();
+                    // If invocation of forceComplete is not successful, then 
that means the request is already completed
+                    // hence release the acquired locks.
+                    if (!completedByMe) {
+                        releasePartitionLocks(partitionsAcquired.keySet());
+                    }
+                    return completedByMe;
+                } else {
+                    log.debug("minBytes is not satisfied for the share fetch 
request for group {}, member {}, " +
+                            "topic partitions {}", shareFetchData.groupId(), 
shareFetchData.memberId(),
+                        sharePartitions.keySet());
+                    releasePartitionLocks(topicPartitionData.keySet());
+                }
+            } else {
+                log.trace("Can't acquire records for any partition in the 
share fetch request for group {}, member {}, " +
+                        "topic partitions {}", shareFetchData.groupId(), 
shareFetchData.memberId(),
+                    sharePartitions.keySet());
             }
-            return completedByMe;
+            return false;
+        } catch (Exception e) {
+            log.error("Error processing delayed share fetch request", e);
+            releasePartitionLocks(topicPartitionData.keySet());

Review Comment:
   Should we reset partitionsAcquired and partitionsAlreadyFetched?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,13 +228,135 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         return topicPartitionData;
     }
 
-    private void releasePartitionLocks(String groupId, Set<TopicIdPartition> 
topicIdPartitions) {
-        topicIdPartitions.forEach(tp -> {
-            SharePartition sharePartition = 
sharePartitionManager.sharePartition(groupId, tp);
-            if (sharePartition == null) {
-                log.error("Encountered null share partition for groupId={}, 
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), tp);
-                return;
+    private Map<TopicIdPartition, LogReadResult> 
maybeReadFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData) {
+        Map<TopicIdPartition, FetchRequest.PartitionData> 
partitionsMissingFetchOffsetMetadata = new LinkedHashMap<>();
+        topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+            if (sharePartition.fetchOffsetMetadata().isEmpty()) {
+                partitionsMissingFetchOffsetMetadata.put(topicIdPartition, 
partitionData);
             }
+        });
+        if (partitionsMissingFetchOffsetMetadata.isEmpty()) {
+            return Collections.emptyMap();
+        }
+        // We fetch data from replica manager corresponding to the topic 
partitions that have missing fetch offset metadata.
+        return readFromLog(partitionsMissingFetchOffsetMetadata);
+    }
+
+    private void maybeUpdateFetchOffsetMetadata(
+        Map<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponseData.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+            LogReadResult replicaManagerLogReadResult = entry.getValue();
+            if (replicaManagerLogReadResult.error().code() != 
Errors.NONE.code()) {
+                log.debug("Replica manager read log result {} errored out for 
topic partition {}",
+                    replicaManagerLogReadResult, topicIdPartition);
+                continue;
+            }
+            
sharePartition.updateFetchOffsetMetadata(Optional.of(replicaManagerLogReadResult.info().fetchOffsetMetadata));
+        }
+    }
+
+    private boolean isMinBytesSatisfied(Map<TopicIdPartition, 
FetchRequest.PartitionData> topicPartitionData) {

Review Comment:
   Let's add a comment that the minBytes estimation currently assumes the 
common case where all fetched data are acquirable. 



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -54,18 +58,25 @@ public class DelayedShareFetch extends DelayedOperation {
     private final ShareFetchData shareFetchData;
     private final ReplicaManager replicaManager;
 
-    private Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionDataFromTryComplete;
+    private Map<TopicIdPartition, FetchRequest.PartitionData> 
partitionsAcquired;
+    private Map<TopicIdPartition, LogReadResult> partitionsAlreadyFetched;
     private final SharePartitionManager sharePartitionManager;
+    // The topic partitions that need to be completed for the share fetch 
request are given by sharePartitions.
+    // sharePartitions is a subset of shareFetchData.
+    private final Map<TopicIdPartition, SharePartition> sharePartitions;
 
     DelayedShareFetch(
             ShareFetchData shareFetchData,
             ReplicaManager replicaManager,
-            SharePartitionManager sharePartitionManager) {
+            SharePartitionManager sharePartitionManager,
+            Map<TopicIdPartition, SharePartition> sharePartitions) {

Review Comment:
   If ordering is important, should we explicitly define it as LinkedHashMap in 
all the places?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to