Hi John,

Just want to check, have you ever changed the kafka protocol in your job
after using the new cluster? The error message shows that it is caused by
the kafka client and there is a similar error in this issue
<https://ververica.zendesk.com/hc/en-us/articles/4413642980498-Direct-buffer-OutOfMemoryError-when-using-Kafka-Connector-in-Flink>
.

Best,
Biao Geng


John Smith <java.dev....@gmail.com> 于2024年5月16日周四 09:01写道:

> I deployed a new cluster, same version as my old cluster(1.14.4 ), only
> difference using Java 11 and it seems after a week of usage the below
> exception happens.
>
> The task manager is...
>
> 32GB total
>
> And i have the ONLY following memory settings
>
> taskmanager.memory.flink.size: 16384m
> taskmanager.memory.jvm-metaspace.size: 3072m
>
>
>
>
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
> out-of-memory error has occurred. This can mean two things: either job(s)
> require(s) a larger size of JVM direct memory or there is a direct memory
> leak. The direct memory can be allocated by user code or some of its
> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
> configuration option should be increased. Flink framework and its
> dependencies also consume the direct memory, mostly for network
> communication. The most of network memory is managed by Flink and should
> not result in out-of-memory error. In certain special cases, in particular
> for jobs with high parallelism, the framework may require more direct
> memory which is not managed by Flink. In this case
> 'taskmanager.memory.framework.off-heap.size' configuration option should be
> increased. If the error persists then there is probably a direct memory
> leak in user code or some of its dependencies which has to be investigated
> and fixed. The task executor has to be shutdown...
> at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
> at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
> at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
> at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:242)
> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
> at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
> at
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
> at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
> at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
> at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
> at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
> at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
> at
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ... 1 more
>

Reply via email to