junrao commented on code in PR #16969:
URL: https://github.com/apache/kafka/pull/16969#discussion_r1741135040


##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -564,16 +592,29 @@ void maybeProcessFetchQueue() {
                         );
                     } else {
                         sharePartition.releaseFetchLock();
-                        log.info("Record lock partition limit exceeded for 
SharePartition with key {}, " +
-                            "cannot acquire more records", sharePartitionKey);
                     }
                 }
             });
 
-            if (topicPartitionData.isEmpty()) {
-                // No locks for share partitions could be acquired, so we 
complete the request and
-                // will re-fetch for the client in next poll.
+            if (shareFetchPartitionData.partitionMaxBytes.isEmpty()) {
+                // If there are no partitions to fetch then complete the 
future with an empty map.
                 
shareFetchPartitionData.future.complete(Collections.emptyMap());
+                // Release the lock so that other threads can process the 
queue.
+                releaseProcessFetchQueueLock();
+                if (!fetchQueue.isEmpty())
+                    maybeProcessFetchQueue();
+                return;
+            }
+            if (topicPartitionData.isEmpty()) {
+                // No locks for any of the share partitions in the fetch 
request could be acquired.
+                Set<Object> delayedShareFetchWatchKeys = new HashSet<>();
+                shareFetchPartitionData.partitionMaxBytes.keySet().forEach(
+                    topicIdPartition -> delayedShareFetchWatchKeys.add(
+                        new DelayedShareFetchKey(topicIdPartition, 
shareFetchPartitionData.groupId)));
+
+                // Add the share fetch to the delayed share fetch purgatory to 
process the fetch request.
+                addDelayedShareFetch(new 
DelayedShareFetch(shareFetchPartitionData, replicaManager, partitionCacheMap),

Review Comment:
   I mean that `DelayedShareFetch.tryComplete()` needs to implement the logic 
for acquiring the partition level lock for both the initial 
`delayedShareFetchPurgatory.tryCompleteElseWatch()` call and the subsequent 
`delayedShareFetchPurgatory.checkAndComplete()` calls. Instead of implementing 
the lock acquiring logic in both `SharePartitionManager` and 
`DelayedShareFetch`, could we just do it once in `DelayedShareFetch`? Then, we 
don't need the `isTryingForFirstTime` flag.
   
   > There is no method in DelayedOperationPurgatory by which we can do an 
operation like add an operation and keys associated to it and just watch it 
without trying. 
   
   That's true. My suggestion is to not acquire the lock in 
`SharePartitionManager` and do that in 
`delayedShareFetchPurgatory.tryCompleteElseWatch()` (through 
`DelayedShareFetch.tryComplete()`) instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to