adixitconfluent commented on code in PR #16969: URL: https://github.com/apache/kafka/pull/16969#discussion_r1741212960
########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -564,16 +592,29 @@ void maybeProcessFetchQueue() { ); } else { sharePartition.releaseFetchLock(); - log.info("Record lock partition limit exceeded for SharePartition with key {}, " + - "cannot acquire more records", sharePartitionKey); } } }); - if (topicPartitionData.isEmpty()) { - // No locks for share partitions could be acquired, so we complete the request and - // will re-fetch for the client in next poll. + if (shareFetchPartitionData.partitionMaxBytes.isEmpty()) { + // If there are no partitions to fetch then complete the future with an empty map. shareFetchPartitionData.future.complete(Collections.emptyMap()); + // Release the lock so that other threads can process the queue. + releaseProcessFetchQueueLock(); + if (!fetchQueue.isEmpty()) + maybeProcessFetchQueue(); + return; + } + if (topicPartitionData.isEmpty()) { + // No locks for any of the share partitions in the fetch request could be acquired. + Set<Object> delayedShareFetchWatchKeys = new HashSet<>(); + shareFetchPartitionData.partitionMaxBytes.keySet().forEach( + topicIdPartition -> delayedShareFetchWatchKeys.add( + new DelayedShareFetchKey(topicIdPartition, shareFetchPartitionData.groupId))); + + // Add the share fetch to the delayed share fetch purgatory to process the fetch request. + addDelayedShareFetch(new DelayedShareFetch(shareFetchPartitionData, replicaManager, partitionCacheMap), Review Comment: > My suggestion is to not acquire the lock in SharePartitionManager and do that in delayedShareFetchPurgatory.tryCompleteElseWatch() (through DelayedShareFetch.tryComplete()) instead. Hi @junrao , I don't think we can do that, because in `SharePartitionManager`, we do a `replicaManager.fetchMessages` to avoid a tight loop between the broker and share consumer. However, we do a `replicaManager.readFromLog` in the onComplete() of `DelayedShareFetch`. So, if we do a tryComplete and the partitions get acquired, then we would be executing the `replicaManager.readFromLog` right after `tryComplete` which will cause a tight loop between broker and share consumer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org