taskmanager.memory.framework.off-heap.size 默认是一个固定值(256MB 以下),不是按百分比算的
OOM 应该是下游反压导致
建议是直接增加 taskmanager.memory.framework.off-heap.size
On Feb 18, 2021, 16:24 +0800, flink2021 <[email protected]>, wrote:
> flink 消费kafka 报错,kafka中的数据目前也不大,10个g左右
> 有时候几个小时报错,有时候3,5分钟报错,是不是kafka的参数没有设置好呢?目前jvm设置为16G,TM 内存也设置比较高的
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
> out-of-memory error has occurred. This can mean two things:
> either job(s) require(s) a larger size of JVM direct memory or there is a
> direct memory leak.
> The direct memory can be allocated by user code or some of its
> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
> configuration option should be increased.
> Flink framework and its dependencies also consume the direct memory, mostly
> for network communication.
> The most of network memory is managed by Flink and should not result in
> out-of-memory error. In certain special cases,
> in particular for jobs with high parallelism, the framework may require
> more direct memory which is not managed by Flink.
> In this case 'taskmanager.memory.framework.off-heap.size' configuration
> option should be increased.
> If the error persists then there is probably a direct memory leak in user
> code or some of its dependencies which has to be investigated and fixed.
> The task executor has to be shutdown...
> at java.nio.Bits.reserveMemory(Bits.java:695)
> at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
> at sun.nio.ch.IOUtil.read(IOUtil.java:195)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:483)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249)
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/