AndrewJSchofield commented on code in PR #16969: URL: https://github.com/apache/kafka/pull/16969#discussion_r1729222379
########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -533,178 +538,52 @@ void maybeProcessFetchQueue() { shareFetchPartitionData.groupId, topicIdPartition ); - SharePartition sharePartition = partitionCacheMap.computeIfAbsent(sharePartitionKey, - k -> { - long start = time.hiResClockMs(); - SharePartition partition = new SharePartition(shareFetchPartitionData.groupId, topicIdPartition, maxInFlightMessages, maxDeliveryCount, - recordLockDurationMs, timer, time, persister); - this.shareGroupMetrics.partitionLoadTime(start); - return partition; - }); - int partitionMaxBytes = shareFetchPartitionData.partitionMaxBytes.getOrDefault(topicIdPartition, 0); - // Add the share partition to the list of partitions to be fetched only if we can - // acquire the fetch lock on it. - if (sharePartition.maybeAcquireFetchLock()) { - // If the share partition is already at capacity, we should not attempt to fetch. - if (sharePartition.canAcquireRecords()) { - topicPartitionData.put( - topicIdPartition, - new FetchRequest.PartitionData( - topicIdPartition.topicId(), - sharePartition.nextFetchOffset(), - 0, - partitionMaxBytes, - Optional.empty() - ) - ); - } else { - sharePartition.releaseFetchLock(); - log.info("Record lock partition limit exceeded for SharePartition with key {}, " + - "cannot acquire more records", sharePartitionKey); - } - } + partitionCacheMap.computeIfAbsent(sharePartitionKey, k -> { + long start = time.hiResClockMs(); + SharePartition partition = new SharePartition(shareFetchPartitionData.groupId, topicIdPartition, maxInFlightMessages, maxDeliveryCount, + recordLockDurationMs, timer, time, persister); + this.shareGroupMetrics.partitionLoadTime(start); + return partition; + }); }); - if (topicPartitionData.isEmpty()) { - // No locks for share partitions could be acquired, so we complete the request and - // will re-fetch for the client in next poll. - shareFetchPartitionData.future.complete(Collections.emptyMap()); - // Though if no partitions can be locked then there must be some other request which - // is in-flight and should release the lock. But it's safe to release the lock as - // the lock on share partition already exists which facilitates correct behaviour - // with multiple requests from queue being processed. - releaseProcessFetchQueueLock(); - if (!fetchQueue.isEmpty()) - maybeProcessFetchQueue(); - return; - } + Set<Object> delayedShareFetchWatchKeys = new HashSet<>(); + delayedShareFetchWatchKeys.add(new DelayedShareFetchKey( + shareFetchPartitionData.partitionMaxBytes.keySet(), + shareFetchPartitionData.groupId, + shareFetchPartitionData.memberId)); - log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", - topicPartitionData, shareFetchPartitionData.groupId, shareFetchPartitionData.fetchParams); - - replicaManager.fetchMessages( - shareFetchPartitionData.fetchParams, - CollectionConverters.asScala( - topicPartitionData.entrySet().stream().map(entry -> - new Tuple2<>(entry.getKey(), entry.getValue())).collect(Collectors.toList()) - ), - QuotaFactory.UnboundedQuota$.MODULE$, - responsePartitionData -> { - log.trace("Data successfully retrieved by replica manager: {}", responsePartitionData); - List<Tuple2<TopicIdPartition, FetchPartitionData>> responseData = CollectionConverters.asJava( - responsePartitionData); - processFetchResponse(shareFetchPartitionData, responseData).whenComplete( - (result, throwable) -> { - if (throwable != null) { - log.error("Error processing fetch response for share partitions", throwable); - shareFetchPartitionData.future.completeExceptionally(throwable); - } else { - shareFetchPartitionData.future.complete(result); - } - // Releasing the lock to move ahead with the next request in queue. - releaseFetchQueueAndPartitionsLock(shareFetchPartitionData.groupId, topicPartitionData.keySet()); - }); - return BoxedUnit.UNIT; - }); + // Add the share fetch to the delayed share fetch purgatory to process the fetch request. + addDelayedShareFetch(new DelayedShareFetch(shareFetchPartitionData, replicaManager, partitionCacheMap), + delayedShareFetchWatchKeys); + // Release the lock so that other threads can process the queue. + releaseProcessFetchQueueLock(); Review Comment: Because we are just using an AtomicBoolean for the lock, we do need to be squeaky clean on managing the lock status because there's no concept of ownership from the point of view of the JVM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org