junrao commented on code in PR #19928: URL: https://github.com/apache/kafka/pull/19928#discussion_r2136178608
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -808,9 +809,18 @@ private void releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> topi // then we should check if there is a pending share fetch request for the topic-partition and complete it. // We add the action to delayed actions queue to avoid an infinite call stack, which could happen if // we directly call delayedShareFetchPurgatory.checkAndComplete - replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition -> + replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition -> { Review Comment: Could we move the comment above to below this line? ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -808,9 +809,18 @@ private void releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> topi // then we should check if there is a pending share fetch request for the topic-partition and complete it. // We add the action to delayed actions queue to avoid an infinite call stack, which could happen if // we directly call delayedShareFetchPurgatory.checkAndComplete - replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition -> + replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition -> { replicaManager.completeDelayedShareFetchRequest( - new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())))); + new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())); + // As DelayedShareFetch operation is watched over multiple keys, same operation might be + // completed and can contain references to data fetched. Hence, if the operation is not + // removed from other watched keys then there can be a memory leak. The removal of the + // operation is dependent on the purge task by DelayedOperationPurgatory. Hence, this can + // also be prevented by setting smaller value for configuration {@link ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG}. + // However, it's best to trigger the check on all the keys that are being watched which + // should free the memory for the completed operation. + replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchPartitionKey(topicIdPartition)); Review Comment: Could you run some perf tests to make sure there is no degradation for share fetch requests not reading from remote storage? If there is degradation, maybe we could only trigger this if remote storage is involved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org