apoorvmittal10 commented on code in PR #19928:
URL: https://github.com/apache/kafka/pull/19928#discussion_r2136306887


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -808,9 +809,18 @@ private void 
releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> topi
         // then we should check if there is a pending share fetch request for 
the topic-partition and complete it.
         // We add the action to delayed actions queue to avoid an infinite 
call stack, which could happen if
         // we directly call delayedShareFetchPurgatory.checkAndComplete
-        replicaManager.addToActionQueue(() -> 
topicIdPartitions.forEach(topicIdPartition ->
+        replicaManager.addToActionQueue(() -> 
topicIdPartitions.forEach(topicIdPartition -> {
             replicaManager.completeDelayedShareFetchRequest(
-                new DelayedShareFetchGroupKey(shareFetch.groupId(), 
topicIdPartition.topicId(), topicIdPartition.partition()))));
+                new DelayedShareFetchGroupKey(shareFetch.groupId(), 
topicIdPartition.topicId(), topicIdPartition.partition()));
+            // As DelayedShareFetch operation is watched over multiple keys, 
same operation might be
+            // completed and can contain references to data fetched. Hence, if 
the operation is not
+            // removed from other watched keys then there can be a memory 
leak. The removal of the
+            // operation is dependent on the purge task by 
DelayedOperationPurgatory. Hence, this can
+            // also be prevented by setting smaller value for configuration 
{@link ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG}.
+            // However, it's best to trigger the check on all the keys that 
are being watched which
+            // should free the memory for the completed operation.
+            replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchPartitionKey(topicIdPartition));

Review Comment:
   I have added the test results 
[here](https://github.com/apache/kafka/pull/19928#issuecomment-2956678794). 
Also monitored the memory consumption in jconsole, which looks stable. There is 
no degradation when not reading from remote storage.
   
   Without this fix, if we run a producer in parallel to share consumer then 
also the issue cannot happen as produce also triggers purgatory to check on 
watch keys per topic-partition as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to