[ https://issues.apache.org/jira/browse/KAFKA-17424 ]


    Ajit Singh deleted comment on KAFKA-17424:
    ------------------------------------

was (Author: JIRAUSER306598):
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to 
unrecoverable exception.
    at 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:635)
    at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:344)
    at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:246)
    at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:215)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:225)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:280)
    at 
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at 
java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.OutOfMemoryError: Java heap space

> Memory optimisation for Kafka-connect
> -------------------------------------
>
>                 Key: KAFKA-17424
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17424
>             Project: Kafka
>          Issue Type: Improvement
>          Components: connect
>    Affects Versions: 3.8.0
>            Reporter: Ajit Singh
>            Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When Kafka connect gives sink task it's own copy of List<SinkRecords> that 
> RAM utilisation shoots up and at that particular moment the there will be two 
> lists and the original list gets cleared after the sink worker finishes the 
> current batch.
>  
> Originally the list is declared final and it's copy is provided to sink task 
> as those can be custom and we let user process it however they want without 
> any risk. But one of the most popular uses of kafka connect is OLTP - OLAP 
> replication, and during initial copying/snapshots a lot of data is generated 
> rapidly which fills the list to it's max batch size length, and we are prone 
> to "Out of Memory" exceptions. And the only use of the list is to get filled 
> > cloned for sink > get size  > cleared > repeat. So I have taken the size of 
> list before giving the original list to sink task and after sink has 
> performed it's operations , set list = new ArrayList<>(). I did not use clear 
> for just in case sink task has set our list to null.
> There is a time vs memory trade-off, 
> In the original approach the jvm does not have spend time to find free memory 
> In new approach the jvm will have to create new list by finding free memory 
> addresses but this results in more free memory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to