Hi John,

Based on the Memory config screenshot provided before, each of your TM should 
have MaxDirectMemory=1GB (network mem) + 128 MB (framework off-heap) = 1152 MB. 
Nor will taskmanager.memory.flink.size and the total including MaxDirectMemory 
exceed pod physical mem, you may check the detailed TM memory model [1] and 
double check for yourself.

Maybe you can further analyze the direct memory usage using tools like JVM 
Native Memory Tracking (NMT).

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/mem_setup_tm/#detailed-memory-model

Best,
Zhanghao Chen
________________________________
From: John Smith <java.dev....@gmail.com>
Sent: Thursday, May 23, 2024 22:40
To: Zhanghao Chen <zhanghao.c...@outlook.com>
Cc: Biao Geng <biaoge...@gmail.com>; user <user@flink.apache.org>
Subject: Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

Based on these two settings...
taskmanager.memory.flink.size: 16384m
taskmanager.memory.jvm-metaspace.size: 3072m

Reading the docs here I'm not sure how to calculate the formula. My suspicion 
is that I may have allocated too much of taskmanager.memory.flink.size and the 
total including MaxDirectMemory is more than what the physical OS has, is that 
possible?
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#taskmanager-memory-task-heap-size

Are you able to tell me what these numbers come out for this formula 
MaxDirectMemory of TM = Network Memory + Task Off-Heap + Framework Off-heap?

On Thu, May 23, 2024 at 9:01 AM John Smith 
<java.dev....@gmail.com<mailto:java.dev....@gmail.com>> wrote:
Ok, but I still don't get why it's doing it... It's the same version of 
flink... Only difference is java 11 and also I allocated more JVM heap and the 
actual physical is has more ram. Maybe I should reduce the JVM heap by a a 
gigabyte or two?

On Wed, May 22, 2024, 12:37 PM Zhanghao Chen 
<zhanghao.c...@outlook.com<mailto:zhanghao.c...@outlook.com>> wrote:
Hi John,

A side note here: Flink will set the MaxDirectMemory of TM = Network Memory + 
Task Off-Heap + Framework Off-heap, and overwrites JVM's default setting, 
regardless of the version of JVM.

Best,
Zhanghao Chen
________________________________
From: John Smith <java.dev....@gmail.com<mailto:java.dev....@gmail.com>>
Sent: Wednesday, May 22, 2024 22:56
To: Biao Geng <biaoge...@gmail.com<mailto:biaoge...@gmail.com>>
Cc: user <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

Hi, apologies I hit reply instead of reply all. So not sure who saw this or 
didn't. We have not switched to SSL and also our assumption here would be that 
if we did switch to SSL the jobs would not work or produce invalid results. The 
jobs work absolutely fine for a week or so and then they fail.

Here is the consumer config from the task logs, which says PLAINTEXT and port 
9092 is used. Also I attached a screen of the task manager memory usage. As 
well I read up on MaxDirectMemory setting of Java 8 vs Java 11. Java 8 by 
default calculates the direct memory size to 87% of the max heap size. While 
Java 11 set it to 100% of the max heap size.

[Screen Shot 2024-05-22 at 9.50.38 AM.png]

 allow.auto.create.topics = true
auto.commit.interval.ms<http://auto.commit.interval.ms> = 5000
auto.offset.reset = latest
bootstrap.servers = [xxxxxx-kafka-0001:9092, xxxxxx-0002:9092, 
xxxxxx-kafka-0003:9092]
check.crcs = true
client.dns.lookup = default
client.id<http://client.id> = xxxxxx
client.rack =
connections.max.idle.ms<http://connections.max.idle.ms> = 540000
default.api.timeout.ms<http://default.api.timeout.ms> = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms<http://fetch.max.wait.ms> = 500
fetch.min.bytes = 1
group.id<http://group.id> = xxxxxx
group.instance.id<http://group.instance.id> = null
heartbeat.interval.ms<http://heartbeat.interval.ms> = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms<http://max.poll.interval.ms> = 300000
max.poll.records = 500
metadata.max.age.ms<http://metadata.max.age.ms> = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms<http://metrics.sample.window.ms> = 30000
partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms<http://reconnect.backoff.max.ms> = 1000
reconnect.backoff.ms<http://reconnect.backoff.ms> = 50
request.timeout.ms<http://request.timeout.ms> = 60000
retry.backoff.ms<http://retry.backoff.ms> = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name<http://sasl.kerberos.service.name> = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms<http://session.timeout.ms> = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

On Thu, May 16, 2024 at 3:20 AM Biao Geng 
<biaoge...@gmail.com<mailto:biaoge...@gmail.com>> wrote:
Hi John,

Just want to check, have you ever changed the kafka protocol in your job after 
using the new cluster? The error message shows that it is caused by the kafka 
client and there is a similar error in this 
issue<https://ververica.zendesk.com/hc/en-us/articles/4413642980498-Direct-buffer-OutOfMemoryError-when-using-Kafka-Connector-in-Flink>.

Best,
Biao Geng


John Smith <java.dev....@gmail.com<mailto:java.dev....@gmail.com>> 
于2024年5月16日周四 09:01写道:
I deployed a new cluster, same version as my old cluster(1.14.4 ), only 
difference using Java 11 and it seems after a week of usage the below exception 
happens.

The task manager is...

32GB total

And i have the ONLY following memory settings

taskmanager.memory.flink.size: 16384m
taskmanager.memory.jvm-metaspace.size: 3072m




Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct 
out-of-memory error has occurred. This can mean two things: either job(s) 
require(s) a larger size of JVM direct memory or there is a direct memory leak. 
The direct memory can be allocated by user code or some of its dependencies. In 
this case 'taskmanager.memory.task.off-heap.size' configuration option should 
be increased. Flink framework and its dependencies also consume the direct 
memory, mostly for network communication. The most of network memory is managed 
by Flink and should not result in out-of-memory error. In certain special 
cases, in particular for jobs with high parallelism, the framework may require 
more direct memory which is not managed by Flink. In this case 
'taskmanager.memory.framework.off-heap.size' configuration option should be 
increased. If the error persists then there is probably a direct memory leak in 
user code or some of its dependencies which has to be investigated and fixed. 
The task executor has to be shutdown...
at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:242)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
at 
org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more

Reply via email to