junrao commented on code in PR #19928:
URL: https://github.com/apache/kafka/pull/19928#discussion_r2136178608


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -808,9 +809,18 @@ private void 
releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> topi
         // then we should check if there is a pending share fetch request for 
the topic-partition and complete it.
         // We add the action to delayed actions queue to avoid an infinite 
call stack, which could happen if
         // we directly call delayedShareFetchPurgatory.checkAndComplete
-        replicaManager.addToActionQueue(() -> 
topicIdPartitions.forEach(topicIdPartition ->
+        replicaManager.addToActionQueue(() -> 
topicIdPartitions.forEach(topicIdPartition -> {

Review Comment:
   Could we move the comment above to below this line?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -808,9 +809,18 @@ private void 
releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> topi
         // then we should check if there is a pending share fetch request for 
the topic-partition and complete it.
         // We add the action to delayed actions queue to avoid an infinite 
call stack, which could happen if
         // we directly call delayedShareFetchPurgatory.checkAndComplete
-        replicaManager.addToActionQueue(() -> 
topicIdPartitions.forEach(topicIdPartition ->
+        replicaManager.addToActionQueue(() -> 
topicIdPartitions.forEach(topicIdPartition -> {
             replicaManager.completeDelayedShareFetchRequest(
-                new DelayedShareFetchGroupKey(shareFetch.groupId(), 
topicIdPartition.topicId(), topicIdPartition.partition()))));
+                new DelayedShareFetchGroupKey(shareFetch.groupId(), 
topicIdPartition.topicId(), topicIdPartition.partition()));
+            // As DelayedShareFetch operation is watched over multiple keys, 
same operation might be
+            // completed and can contain references to data fetched. Hence, if 
the operation is not
+            // removed from other watched keys then there can be a memory 
leak. The removal of the
+            // operation is dependent on the purge task by 
DelayedOperationPurgatory. Hence, this can
+            // also be prevented by setting smaller value for configuration 
{@link ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG}.
+            // However, it's best to trigger the check on all the keys that 
are being watched which
+            // should free the memory for the completed operation.
+            replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchPartitionKey(topicIdPartition));

Review Comment:
   Could you run some perf tests to make sure there is no degradation for share 
fetch requests not reading from remote storage? If there is degradation, maybe 
we could only trigger this if remote storage is involved. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to