apoorvmittal10 commented on code in PR #19928: URL: https://github.com/apache/kafka/pull/19928#discussion_r2136306887
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -808,9 +809,18 @@ private void releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> topi // then we should check if there is a pending share fetch request for the topic-partition and complete it. // We add the action to delayed actions queue to avoid an infinite call stack, which could happen if // we directly call delayedShareFetchPurgatory.checkAndComplete - replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition -> + replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition -> { replicaManager.completeDelayedShareFetchRequest( - new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())))); + new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())); + // As DelayedShareFetch operation is watched over multiple keys, same operation might be + // completed and can contain references to data fetched. Hence, if the operation is not + // removed from other watched keys then there can be a memory leak. The removal of the + // operation is dependent on the purge task by DelayedOperationPurgatory. Hence, this can + // also be prevented by setting smaller value for configuration {@link ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG}. + // However, it's best to trigger the check on all the keys that are being watched which + // should free the memory for the completed operation. + replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchPartitionKey(topicIdPartition)); Review Comment: I have added the test results [here](https://github.com/apache/kafka/pull/19928#issuecomment-2956678794). Also monitored the memory consumption in jconsole, which looks stable. There is no degradation when not reading from remote storage. Without this fix, if we run a producer in parallel to share consumer then also the issue cannot happen as produce also triggers purgatory to check on watch keys per topic-partition as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org