[
https://issues.apache.org/jira/browse/KAFKA-19763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18028323#comment-18028323
]
Luke Chen commented on KAFKA-19763:
-----------------------------------
This patch fixes the problem. But it slows down the read throughput because it
takes time to clone the buffer. There should be other better solutions.
{code:java}
--- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
@@ -22,6 +22,7 @@ import kafka.utils.Logging
import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.server.LogReadResult
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.purgatory.DelayedOperation
@@ -108,6 +109,7 @@ class DelayedRemoteFetch(remoteFetchTasks:
util.Map[TopicIdPartition, Future[Voi
*/
override def onComplete(): Unit = {
val fetchPartitionData = localReadResults.map { case (tp, result) =>
+
val remoteFetchResult = remoteFetchResults.get(tp)
if (remoteFetchInfos.containsKey(tp)
&& remoteFetchResult.isDone
@@ -121,7 +123,8 @@ class DelayedRemoteFetch(remoteFetchTasks:
util.Map[TopicIdPartition, Future[Voi
result.error,
result.highWatermark,
result.leaderLogStartOffset,
- info.records,
+ // clone the record buffer to release the memory
+
MemoryRecords.readableRecords(info.records.asInstanceOf[MemoryRecords].buffer()),
Optional.empty(),
if (result.lastStableOffset.isPresent)
OptionalLong.of(result.lastStableOffset.getAsLong) else OptionalLong.empty(),
info.abortedTransactions,
@@ -132,7 +135,8 @@ class DelayedRemoteFetch(remoteFetchTasks:
util.Map[TopicIdPartition, Future[Voi
tp -> result.toFetchPartitionData(false)
}
}
-
+ // clear the map to avoid memory leak
+ remoteFetchResults.clear()
responseCallback(fetchPartitionData)
}
}{code}
> Parallel remote reads causes memory leak in broker
> --------------------------------------------------
>
> Key: KAFKA-19763
> URL: https://issues.apache.org/jira/browse/KAFKA-19763
> Project: Kafka
> Issue Type: Task
> Reporter: Kamal Chandraprakash
> Assignee: Kamal Chandraprakash
> Priority: Blocker
> Fix For: 4.2.0
>
> Attachments: RemoteReadMemoryLeakReproducer.java, Screenshot
> 2025-10-07 at 8.25.45 PM.png
>
>
> Broker heap memory gets filled up and throws OOM error when remote reads are
> triggered for multiple partitions within a FETCH request.
> Steps to reproduce:
> 1. Start a one node broker and configure LocalTieredStorage as remote
> storage.
> 2. Create a topic with 5 partitions.
> 3. Produce message and ensure that few segments are uploaded to remote.
> 4. Start a consumer to read from those 5 partitions. Seek the offset to
> beginning for 4 partitions and to end for 1 partition. This is to simulate
> that the FETCH request read from both remote-log and local-log.
> 5. The broker crashes with the OOM error.
> 6. The DelayedRemoteFetch / RemoteLogReadResult references are being held by
> the purgatory, so the broker crashes.
> cc [~showuon]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)