We recently migrated to Flink 1.10, but are experiencing some issues with
memory.

Our cluster is:
1) Running inside of Kubernetes
2) Running in HA mode
3) Checkpointing/Savepointing to an HDFS cluster inside of Kubernetes
4) Using RocksDB for checkpointing
5) Running on m5d.4xlarge EC2 instances with 64gb of ram
6) The taskmanager pods do not have a memory limit set on them
7) We set taskmanager.memory.process.size to 48g

We get the following error:
2020-05-27 10:12:34
java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.streamOperatorStateContext(
StreamTaskStateInitializerImpl.java:191)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.initializeState(AbstractStreamOperator.java:255)
    at org.apache.flink.streaming.runtime.tasks.StreamTask
.initializeStateAndOpen(StreamTask.java:989)
    at org.apache.flink.streaming.runtime.tasks.StreamTask
.lambda$beforeInvoke$0(StreamTask.java:453)
    at org.apache.flink.streaming.runtime.tasks.
StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(
StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
StreamTask.java:448)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:460)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for WindowOperator_376501a366f04bbaab99945c23a40da5_(28/32)
from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.createAndRestore(BackendRestorerProcedure.java:135)
    at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.keyedStatedBackend(
StreamTaskStateInitializerImpl.java:304)
    at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.streamOperatorStateContext(
StreamTaskStateInitializerImpl.java:131)
    ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
unexpected exception.
    at org.apache.flink.contrib.streaming.state.
RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:
336)
    at org.apache.flink.contrib.streaming.state.RocksDBStateBackend
.createKeyedStateBackend(RocksDBStateBackend.java:548)
    at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(
StreamTaskStateInitializerImpl.java:288)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.createAndRestore(BackendRestorerProcedure.java:121)
    ... 11 more
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:694)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
    at org.apache.hadoop.util.DirectBufferPool.getBuffer(DirectBufferPool
.java:72)
    at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver
.reallocPacketBuf(PacketReceiver.java:272)
    at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(
PacketReceiver.java:165)
    at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver
.receiveNextPacket(PacketReceiver.java:102)
    at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(
RemoteBlockReader2.java:201)
    at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2
.java:152)
    at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(
DFSInputStream.java:767)
    at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:
823)
    at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream
.java:883)
    at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:926)
    at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:727)
    at java.io.FilterInputStream.read(FilterInputStream.java:83)
    at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(
HadoopDataInputStream.java:84)
    at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(
FSDataInputStreamWrapper.java:51)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.flink.core.io.VersionedIOReadableWritable.read(
VersionedIOReadableWritable.java:45)
    at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(
KeyedBackendSerializationProxy.java:133)
    at org.apache.flink.contrib.streaming.state.restore.
AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation
.java:187)
    at org.apache.flink.contrib.streaming.state.restore.
RocksDBFullRestoreOperation.restoreKVStateMetaData(
RocksDBFullRestoreOperation.java:180)
    at org.apache.flink.contrib.streaming.state.restore.
RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(
RocksDBFullRestoreOperation.java:167)
    at org.apache.flink.contrib.streaming.state.restore.
RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
    at org.apache.flink.contrib.streaming.state.
RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:
279)
    ... 15 more

Do I need to be more explicit with the off-heap memory for RocksDB?

-Steve

Reply via email to