[
https://issues.apache.org/jira/browse/KAFKA-17424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877271#comment-17877271
]
Ajit Singh edited comment on KAFKA-17424 at 8/28/24 12:37 PM:
--------------------------------------------------------------
[~gharris1727] as for evidence what else shall I provide apart from stacktrace
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to
unrecoverable exception.
at
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:635)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:344)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:246)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:215)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:225)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:280)
at
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at
java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.OutOfMemoryError: Java heap space
I also ran this in debug mode to get to the exact line where this happened and
it was on line
WorkerSinkTask: 605: task.put(new ArrayList<>(messageBatch));
was (Author: JIRAUSER306598):
[~gharris1727] as for evidence do I need to provide anything else apart from
stacktrace
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to
unrecoverable exception.
at
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:635)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:344)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:246)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:215)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:225)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:280)
at
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at
java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.OutOfMemoryError: Java heap space
I also ran this in debug mode to get to the exact line where this happened and
it was on line
WorkerSinkTask: 605: task.put(new ArrayList<>(messageBatch));
> Memory optimisation for Kafka-connect
> -------------------------------------
>
> Key: KAFKA-17424
> URL: https://issues.apache.org/jira/browse/KAFKA-17424
> Project: Kafka
> Issue Type: Improvement
> Components: connect
> Affects Versions: 3.8.0
> Reporter: Ajit Singh
> Priority: Major
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> When Kafka connect gives sink task it's own copy of List<SinkRecords> that
> RAM utilisation shoots up and at that particular moment the there will be two
> lists and the original list gets cleared after the sink worker finishes the
> current batch.
>
> Originally the list is declared final and it's copy is provided to sink task
> as those can be custom and we let user process it however they want without
> any risk. But one of the most popular uses of kafka connect is OLTP - OLAP
> replication, and during initial copying/snapshots a lot of data is generated
> rapidly which fills the list to it's max batch size length, and we are prone
> to "Out of Memory" exceptions. And the only use of the list is to get filled
> > cloned for sink > get size > cleared > repeat. So I have taken the size of
> list before giving the original list to sink task and after sink has
> performed it's operations , set list = new ArrayList<>(). I did not use clear
> for just in case sink task has set our list to null.
> There is a time vs memory trade-off,
> In the original approach the jvm does not have spend time to find free memory
> In new approach the jvm will have to create new list by finding free memory
> addresses but this results in more free memory.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)