Hi Mike,

Which version of Flink did you use? Could you try Flink-1.14 which enables 
logging of RocksDB [1][2] to see what reported in RocksDB log. From my 
experience, this is caused by waiting for resource (maybe column family) to 
close when closing the DB, and you should not meet this problem each time.


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#state-backend-rocksdb-log-dir
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#state-backend-rocksdb-log-level

Best
Yun Tang

________________________________
From: Mike Barborak <mi...@ec.ai>
Sent: Monday, November 1, 2021 3:36
To: user@flink.apache.org <user@flink.apache.org>
Subject: savepoint.readKeyedState hangs on org.rocksdb.RocksDB.disposeInternal


Hi,



I am using the state processing API to examine a savepoint. My code works fine 
when I use a HashMapStateBackend but for larger savepoints, I don’t have enough 
memory so need to use a EmbeddedRocksDBStateBackend. Even then, I am able to 
process some smaller states but this one:



operatorID,parallelism,maxParallelism,coordinatorState (bytes),sub task 
states,total size (bytes)

6030185956219c0e7d5d37d16df14a69,1,128,(none),1,16201253369



…hangs with a thread stuck here:



org.rocksdb.RocksDB.disposeInternal(long) RocksDB.java (native)

org.rocksdb.RocksObject.disposeInternal() RocksObject.java:37

org.rocksdb.AbstractImmutableNativeReference.close() 
AbstractImmutableNativeReference.java:57

org.apache.flink.util.IOUtils.closeQuietly(AutoCloseable) IOUtils.java:275

org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose() 
RocksDBKeyedStateBackend.java:456

org.apache.flink.state.api.input.operator.StateReaderOperator.close() 
StateReaderOperator.java:120

org.apache.flink.state.api.input.KeyedStateInputFormat.close() 
KeyedStateInputFormat.java:206

org.apache.flink.runtime.operators.DataSourceTask.invoke() 
DataSourceTask.java:219

org.apache.flink.runtime.taskmanager.Task.doRun() Task.java:779

org.apache.flink.runtime.taskmanager.Task.run() Task.java:566

java.lang.Thread.run() Thread.java:829



I found this issue:



https://issues.apache.org/jira/browse/FLINK-20044



which seems to imply that an error has occurred but I don’t see any sign of an 
error in my logs. My code basically looks like this:



DataSource<StateReport> source = savepoint.readKeyedState("re-process-1", new 
ReFunctionStateReader());

…

source. writeAsFormattedText(…);

env.execute();



And below is how the logs end. Any suggestions as to what I might do to resolve 
this issue?



Thanks,

Mike



9281 INFO  [flink-akka.actor.default-dispatcher-3] 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - DataSink 
(TextOutputFormat (savepoint-re-fn-report.csv) - UTF-8) (1/1) 
(9373e99f41ebc27a485fdd3bb3496a1a) switched from DEPLOYING to INITIALIZING.

9281 INFO  [flink-akka.actor.default-dispatcher-3] 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - DataSink 
(TextOutputFormat (savepoint-re-fn-report.csv) - UTF-8) (1/1) 
(9373e99f41ebc27a485fdd3bb3496a1a) switched from INITIALIZING to RUNNING.

9291 WARN  [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) 
(org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:151)) (1/1)#0] 
org.apache.flink.metrics.MetricGroup [] - The operator name DataSource (at 
readKeyedState(ExistingSavepoint.java:314) 
(org.apache.flink.state.api.input.KeyedStateInputFormat)) exceeded the 80 
characters length limit and was truncated.

9291 WARN  [CHAIN DataSource (at runStateReport(SavepointReport.java:147) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:147)) (1/1)#0] 
org.apache.flink.metrics.MetricGroup [] - The operator name DataSource (at 
runStateReport(SavepointReport.java:147) 
(org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80 
characters length limit and was truncated.

9299 INFO  [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) 
(org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:151)) (1/1)#0] 
org.apache.flink.runtime.io.disk.FileChannelManagerImpl [] - FileChannelManager 
uses directory 
/var/folders/sr/mwx4cq6s4qv_q_d6bgc5ptv80000gn/T/flink-io-ae624c6f-0ec3-46e2-9b67-a1828a712280
 for spill files.

9308 INFO  [CHAIN DataSource (at runStateReport(SavepointReport.java:147) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:147)) (1/1)#0] 
org.apache.flink.runtime.taskmanager.Task [] - CHAIN DataSource (at 
runStateReport(SavepointReport.java:147) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:147)) (1/1)#0 
(9b8eb1c7a89c2b36a94049376c5dae6c) switched from RUNNING to FINISHED.

9308 INFO  [CHAIN DataSource (at runStateReport(SavepointReport.java:147) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:147)) (1/1)#0] 
org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for CHAIN 
DataSource (at runStateReport(SavepointReport.java:147) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:147)) (1/1)#0 
(9b8eb1c7a89c2b36a94049376c5dae6c).

9311 INFO  [flink-akka.actor.default-dispatcher-3] 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and 
sending final execution state FINISHED to JobManager for task CHAIN DataSource 
(at runStateReport(SavepointReport.java:147) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:147)) (1/1)#0 
9b8eb1c7a89c2b36a94049376c5dae6c.

9313 INFO  [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) 
(org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:151)) (1/1)#0] 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
Attempting to load RocksDB native library and store it under 
'/var/folders/sr/mwx4cq6s4qv_q_d6bgc5ptv80000gn/T/flink-io-ae624c6f-0ec3-46e2-9b67-a1828a712280'

9321 INFO  [flink-akka.actor.default-dispatcher-6] 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource 
(at runStateReport(SavepointReport.java:147) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:147)) (1/1) 
(9b8eb1c7a89c2b36a94049376c5dae6c) switched from RUNNING to FINISHED.

9561 INFO  [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) 
(org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:151)) (1/1)#0] 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
Successfully loaded RocksDB native library

9570 INFO  [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) 
(org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:151)) (1/1)#0] 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
Getting managed memory shared cache for RocksDB.

9573 INFO  [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) 
(org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:151)) (1/1)#0] 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
Obtained shared RocksDB cache of size 67108864 bytes

9824 INFO  [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) 
(org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:151)) (1/1)#0] 
org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation [] - 
Starting to restore from state handle: 
KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
 endKeyGroup=127}, offsets=[84184, 97979226, 217371561, 295595622, 439574029, 
558281882, 733284918, 974056904, 1077346944, 1207588358, 1309072112, 
1404981561, 1555072389, 1648312051, 1770101727, 1871634781, 2087771527, 
2187184752, 2414219813, 2515316945, 2631804199, 2729979188, 2922225187, 
3045642428, 3262427401, 3358964955, 3466713255, 3593016146, 3714840493, 
3818058962, 3911382076, 4011213147, 4141534943, 4273318080, 4381607680, 
4486938271, 4573049384, 4708982976, 4887623520, 5041168778, 5154668536, 
5288860823, 5374337001, 5484378918, 5613171728, 5799983770, 5914309992, 
6027189739, 6147120423, 6245180677, 6359314082, 6473151610, 6584315864, 
6745588200, 6849061980, 6945922434, 7076120744, 7189799673, 7298032068, 
7401483802, 7523698152, 7625978817, 7851967489, 8048030846, 8163864487, 
8258026739, 8361709853, 8488631881, 8587903996, 8717003177, 8832686400, 
8924435995, 9008880140, 9098243461, 9209217981, 9321370064, 9528996685, 
9669462006, 9771717841, 9924405572, 10006668961, 10102187175, 10208713272, 
10315707983, 10448831942, 10581272525, 10694443074, 10803791729, 10925152867, 
11028906474, 11191213540, 11296313712, 11448709650, 11574331504, 11844259366, 
11927519634, 12015857009, 12121419982, 12235944888, 12311643736, 12422467285, 
12535417272, 12628866385, 12741620669, 12830885243, 12910504273, 13012574020, 
13152586397, 13270425326, 13392334040, 13509000111, 13626241487, 13727493803, 
13916299006, 14059489943, 14190951395, 14378881966, 14575613436, 14682829220, 
14912173683, 15224725149, 15403872561, 15498873556, 15619714817, 15735403180, 
15842446504, 15970747949, 16085182172]}, stateHandle=RelativeFileStateHandle 
State: 
file:/Users/mikeb/Documents/braid/braid-job/savepoints/savepoint-1e4955-ac043eda3ee4/0fa5d199-32b9-4176-b80b-d8f7bdd21f3b,
 0fa5d199-32b9-4176-b80b-d8f7bdd21f3b [16201253369 bytes]}.

Reply via email to