Hi Mike, Which version of Flink did you use? Could you try Flink-1.14 which enables logging of RocksDB [1][2] to see what reported in RocksDB log. From my experience, this is caused by waiting for resource (maybe column family) to close when closing the DB, and you should not meet this problem each time.
[1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#state-backend-rocksdb-log-dir [2] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#state-backend-rocksdb-log-level Best Yun Tang ________________________________ From: Mike Barborak <mi...@ec.ai> Sent: Monday, November 1, 2021 3:36 To: user@flink.apache.org <user@flink.apache.org> Subject: savepoint.readKeyedState hangs on org.rocksdb.RocksDB.disposeInternal Hi, I am using the state processing API to examine a savepoint. My code works fine when I use a HashMapStateBackend but for larger savepoints, I don’t have enough memory so need to use a EmbeddedRocksDBStateBackend. Even then, I am able to process some smaller states but this one: operatorID,parallelism,maxParallelism,coordinatorState (bytes),sub task states,total size (bytes) 6030185956219c0e7d5d37d16df14a69,1,128,(none),1,16201253369 …hangs with a thread stuck here: org.rocksdb.RocksDB.disposeInternal(long) RocksDB.java (native) org.rocksdb.RocksObject.disposeInternal() RocksObject.java:37 org.rocksdb.AbstractImmutableNativeReference.close() AbstractImmutableNativeReference.java:57 org.apache.flink.util.IOUtils.closeQuietly(AutoCloseable) IOUtils.java:275 org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose() RocksDBKeyedStateBackend.java:456 org.apache.flink.state.api.input.operator.StateReaderOperator.close() StateReaderOperator.java:120 org.apache.flink.state.api.input.KeyedStateInputFormat.close() KeyedStateInputFormat.java:206 org.apache.flink.runtime.operators.DataSourceTask.invoke() DataSourceTask.java:219 org.apache.flink.runtime.taskmanager.Task.doRun() Task.java:779 org.apache.flink.runtime.taskmanager.Task.run() Task.java:566 java.lang.Thread.run() Thread.java:829 I found this issue: https://issues.apache.org/jira/browse/FLINK-20044 which seems to imply that an error has occurred but I don’t see any sign of an error in my logs. My code basically looks like this: DataSource<StateReport> source = savepoint.readKeyedState("re-process-1", new ReFunctionStateReader()); … source. writeAsFormattedText(…); env.execute(); And below is how the logs end. Any suggestions as to what I might do to resolve this issue? Thanks, Mike 9281 INFO [flink-akka.actor.default-dispatcher-3] org.apache.flink.runtime.executiongraph.ExecutionGraph [] - DataSink (TextOutputFormat (savepoint-re-fn-report.csv) - UTF-8) (1/1) (9373e99f41ebc27a485fdd3bb3496a1a) switched from DEPLOYING to INITIALIZING. 9281 INFO [flink-akka.actor.default-dispatcher-3] org.apache.flink.runtime.executiongraph.ExecutionGraph [] - DataSink (TextOutputFormat (savepoint-re-fn-report.csv) - UTF-8) (1/1) (9373e99f41ebc27a485fdd3bb3496a1a) switched from INITIALIZING to RUNNING. 9291 WARN [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) (org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:151)) (1/1)#0] org.apache.flink.metrics.MetricGroup [] - The operator name DataSource (at readKeyedState(ExistingSavepoint.java:314) (org.apache.flink.state.api.input.KeyedStateInputFormat)) exceeded the 80 characters length limit and was truncated. 9291 WARN [CHAIN DataSource (at runStateReport(SavepointReport.java:147) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:147)) (1/1)#0] org.apache.flink.metrics.MetricGroup [] - The operator name DataSource (at runStateReport(SavepointReport.java:147) (org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80 characters length limit and was truncated. 9299 INFO [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) (org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:151)) (1/1)#0] org.apache.flink.runtime.io.disk.FileChannelManagerImpl [] - FileChannelManager uses directory /var/folders/sr/mwx4cq6s4qv_q_d6bgc5ptv80000gn/T/flink-io-ae624c6f-0ec3-46e2-9b67-a1828a712280 for spill files. 9308 INFO [CHAIN DataSource (at runStateReport(SavepointReport.java:147) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:147)) (1/1)#0] org.apache.flink.runtime.taskmanager.Task [] - CHAIN DataSource (at runStateReport(SavepointReport.java:147) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:147)) (1/1)#0 (9b8eb1c7a89c2b36a94049376c5dae6c) switched from RUNNING to FINISHED. 9308 INFO [CHAIN DataSource (at runStateReport(SavepointReport.java:147) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:147)) (1/1)#0] org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for CHAIN DataSource (at runStateReport(SavepointReport.java:147) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:147)) (1/1)#0 (9b8eb1c7a89c2b36a94049376c5dae6c). 9311 INFO [flink-akka.actor.default-dispatcher-3] org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FINISHED to JobManager for task CHAIN DataSource (at runStateReport(SavepointReport.java:147) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:147)) (1/1)#0 9b8eb1c7a89c2b36a94049376c5dae6c. 9313 INFO [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) (org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:151)) (1/1)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Attempting to load RocksDB native library and store it under '/var/folders/sr/mwx4cq6s4qv_q_d6bgc5ptv80000gn/T/flink-io-ae624c6f-0ec3-46e2-9b67-a1828a712280' 9321 INFO [flink-akka.actor.default-dispatcher-6] org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource (at runStateReport(SavepointReport.java:147) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:147)) (1/1) (9b8eb1c7a89c2b36a94049376c5dae6c) switched from RUNNING to FINISHED. 9561 INFO [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) (org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:151)) (1/1)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Successfully loaded RocksDB native library 9570 INFO [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) (org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:151)) (1/1)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Getting managed memory shared cache for RocksDB. 9573 INFO [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) (org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:151)) (1/1)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Obtained shared RocksDB cache of size 67108864 bytes 9824 INFO [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) (org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:151)) (1/1)#0] org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation [] - Starting to restore from state handle: KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0, endKeyGroup=127}, offsets=[84184, 97979226, 217371561, 295595622, 439574029, 558281882, 733284918, 974056904, 1077346944, 1207588358, 1309072112, 1404981561, 1555072389, 1648312051, 1770101727, 1871634781, 2087771527, 2187184752, 2414219813, 2515316945, 2631804199, 2729979188, 2922225187, 3045642428, 3262427401, 3358964955, 3466713255, 3593016146, 3714840493, 3818058962, 3911382076, 4011213147, 4141534943, 4273318080, 4381607680, 4486938271, 4573049384, 4708982976, 4887623520, 5041168778, 5154668536, 5288860823, 5374337001, 5484378918, 5613171728, 5799983770, 5914309992, 6027189739, 6147120423, 6245180677, 6359314082, 6473151610, 6584315864, 6745588200, 6849061980, 6945922434, 7076120744, 7189799673, 7298032068, 7401483802, 7523698152, 7625978817, 7851967489, 8048030846, 8163864487, 8258026739, 8361709853, 8488631881, 8587903996, 8717003177, 8832686400, 8924435995, 9008880140, 9098243461, 9209217981, 9321370064, 9528996685, 9669462006, 9771717841, 9924405572, 10006668961, 10102187175, 10208713272, 10315707983, 10448831942, 10581272525, 10694443074, 10803791729, 10925152867, 11028906474, 11191213540, 11296313712, 11448709650, 11574331504, 11844259366, 11927519634, 12015857009, 12121419982, 12235944888, 12311643736, 12422467285, 12535417272, 12628866385, 12741620669, 12830885243, 12910504273, 13012574020, 13152586397, 13270425326, 13392334040, 13509000111, 13626241487, 13727493803, 13916299006, 14059489943, 14190951395, 14378881966, 14575613436, 14682829220, 14912173683, 15224725149, 15403872561, 15498873556, 15619714817, 15735403180, 15842446504, 15970747949, 16085182172]}, stateHandle=RelativeFileStateHandle State: file:/Users/mikeb/Documents/braid/braid-job/savepoints/savepoint-1e4955-ac043eda3ee4/0fa5d199-32b9-4176-b80b-d8f7bdd21f3b, 0fa5d199-32b9-4176-b80b-d8f7bdd21f3b [16201253369 bytes]}.