We recently migrated to Flink 1.10, but are experiencing some issues with memory.
Our cluster is: 1) Running inside of Kubernetes 2) Running in HA mode 3) Checkpointing/Savepointing to an HDFS cluster inside of Kubernetes 4) Using RocksDB for checkpointing 5) Running on m5d.4xlarge EC2 instances with 64gb of ram 6) The taskmanager pods do not have a memory limit set on them 7) We set taskmanager.memory.process.size to 48g We get the following error: 2020-05-27 10:12:34 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators. StreamTaskStateInitializerImpl.streamOperatorStateContext( StreamTaskStateInitializerImpl.java:191) at org.apache.flink.streaming.api.operators.AbstractStreamOperator .initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask .initializeStateAndOpen(StreamTask.java:989) at org.apache.flink.streaming.runtime.tasks.StreamTask .lambda$beforeInvoke$0(StreamTask.java:453) at org.apache.flink.streaming.runtime.tasks. StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing( StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( StreamTask.java:448) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:460) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_376501a366f04bbaab99945c23a40da5_(28/32) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure .createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators. StreamTaskStateInitializerImpl.keyedStatedBackend( StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators. StreamTaskStateInitializerImpl.streamOperatorStateContext( StreamTaskStateInitializerImpl.java:131) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state. RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java: 336) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend .createKeyedStateBackend(RocksDBStateBackend.java:548) at org.apache.flink.streaming.api.operators. StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1( StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure .attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure .createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:694) at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at org.apache.hadoop.util.DirectBufferPool.getBuffer(DirectBufferPool .java:72) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver .reallocPacketBuf(PacketReceiver.java:272) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead( PacketReceiver.java:165) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver .receiveNextPacket(PacketReceiver.java:102) at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket( RemoteBlockReader2.java:201) at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2 .java:152) at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead( DFSInputStream.java:767) at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java: 823) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream .java:883) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:926) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:727) at java.io.FilterInputStream.read(FilterInputStream.java:83) at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read( HadoopDataInputStream.java:84) at org.apache.flink.core.fs.FSDataInputStreamWrapper.read( FSDataInputStreamWrapper.java:51) at java.io.DataInputStream.readInt(DataInputStream.java:387) at org.apache.flink.core.io.VersionedIOReadableWritable.read( VersionedIOReadableWritable.java:45) at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read( KeyedBackendSerializationProxy.java:133) at org.apache.flink.contrib.streaming.state.restore. AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation .java:187) at org.apache.flink.contrib.streaming.state.restore. RocksDBFullRestoreOperation.restoreKVStateMetaData( RocksDBFullRestoreOperation.java:180) at org.apache.flink.contrib.streaming.state.restore. RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle( RocksDBFullRestoreOperation.java:167) at org.apache.flink.contrib.streaming.state.restore. RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151) at org.apache.flink.contrib.streaming.state. RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java: 279) ... 15 more Do I need to be more explicit with the off-heap memory for RocksDB? -Steve