AndrewJSchofield commented on code in PR #16969:
URL: https://github.com/apache/kafka/pull/16969#discussion_r1729222379


##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -533,178 +538,52 @@ void maybeProcessFetchQueue() {
                     shareFetchPartitionData.groupId,
                     topicIdPartition
                 );
-                SharePartition sharePartition = 
partitionCacheMap.computeIfAbsent(sharePartitionKey,
-                    k -> {
-                        long start = time.hiResClockMs();
-                        SharePartition partition = new 
SharePartition(shareFetchPartitionData.groupId, topicIdPartition, 
maxInFlightMessages, maxDeliveryCount,
-                            recordLockDurationMs, timer, time, persister);
-                        this.shareGroupMetrics.partitionLoadTime(start);
-                        return partition;
-                    });
-                int partitionMaxBytes = 
shareFetchPartitionData.partitionMaxBytes.getOrDefault(topicIdPartition, 0);
-                // Add the share partition to the list of partitions to be 
fetched only if we can
-                // acquire the fetch lock on it.
-                if (sharePartition.maybeAcquireFetchLock()) {
-                    // If the share partition is already at capacity, we 
should not attempt to fetch.
-                    if (sharePartition.canAcquireRecords()) {
-                        topicPartitionData.put(
-                            topicIdPartition,
-                            new FetchRequest.PartitionData(
-                                topicIdPartition.topicId(),
-                                sharePartition.nextFetchOffset(),
-                                0,
-                                partitionMaxBytes,
-                                Optional.empty()
-                            )
-                        );
-                    } else {
-                        sharePartition.releaseFetchLock();
-                        log.info("Record lock partition limit exceeded for 
SharePartition with key {}, " +
-                            "cannot acquire more records", sharePartitionKey);
-                    }
-                }
+                partitionCacheMap.computeIfAbsent(sharePartitionKey, k -> {
+                    long start = time.hiResClockMs();
+                    SharePartition partition = new 
SharePartition(shareFetchPartitionData.groupId, topicIdPartition, 
maxInFlightMessages, maxDeliveryCount,
+                        recordLockDurationMs, timer, time, persister);
+                    this.shareGroupMetrics.partitionLoadTime(start);
+                    return partition;
+                });
             });
 
-            if (topicPartitionData.isEmpty()) {
-                // No locks for share partitions could be acquired, so we 
complete the request and
-                // will re-fetch for the client in next poll.
-                
shareFetchPartitionData.future.complete(Collections.emptyMap());
-                // Though if no partitions can be locked then there must be 
some other request which
-                // is in-flight and should release the lock. But it's safe to 
release the lock as
-                // the lock on share partition already exists which 
facilitates correct behaviour
-                // with multiple requests from queue being processed.
-                releaseProcessFetchQueueLock();
-                if (!fetchQueue.isEmpty())
-                    maybeProcessFetchQueue();
-                return;
-            }
+            Set<Object> delayedShareFetchWatchKeys = new HashSet<>();
+            delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(
+                shareFetchPartitionData.partitionMaxBytes.keySet(),
+                shareFetchPartitionData.groupId,
+                shareFetchPartitionData.memberId));
 
-            log.trace("Fetchable share partitions data: {} with groupId: {} 
fetch params: {}",
-                topicPartitionData, shareFetchPartitionData.groupId, 
shareFetchPartitionData.fetchParams);
-
-            replicaManager.fetchMessages(
-                shareFetchPartitionData.fetchParams,
-                CollectionConverters.asScala(
-                    topicPartitionData.entrySet().stream().map(entry ->
-                        new Tuple2<>(entry.getKey(), 
entry.getValue())).collect(Collectors.toList())
-                ),
-                QuotaFactory.UnboundedQuota$.MODULE$,
-                responsePartitionData -> {
-                    log.trace("Data successfully retrieved by replica manager: 
{}", responsePartitionData);
-                    List<Tuple2<TopicIdPartition, FetchPartitionData>> 
responseData = CollectionConverters.asJava(
-                        responsePartitionData);
-                    processFetchResponse(shareFetchPartitionData, 
responseData).whenComplete(
-                        (result, throwable) -> {
-                            if (throwable != null) {
-                                log.error("Error processing fetch response for 
share partitions", throwable);
-                                
shareFetchPartitionData.future.completeExceptionally(throwable);
-                            } else {
-                                
shareFetchPartitionData.future.complete(result);
-                            }
-                            // Releasing the lock to move ahead with the next 
request in queue.
-                            
releaseFetchQueueAndPartitionsLock(shareFetchPartitionData.groupId, 
topicPartitionData.keySet());
-                        });
-                    return BoxedUnit.UNIT;
-                });
+            // Add the share fetch to the delayed share fetch purgatory to 
process the fetch request.
+            addDelayedShareFetch(new 
DelayedShareFetch(shareFetchPartitionData, replicaManager, partitionCacheMap),
+                delayedShareFetchWatchKeys);
 
+            // Release the lock so that other threads can process the queue.
+            releaseProcessFetchQueueLock();

Review Comment:
   Because we are just using an AtomicBoolean for the lock, we do need to be 
squeaky clean on managing the lock status because there's no concept of 
ownership from the point of view of the JVM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to