Florian Schmidt created FLINK-9336:
--------------------------------------

             Summary: Queryable state fails with Exception after task manager 
restore
                 Key: FLINK-9336
                 URL: https://issues.apache.org/jira/browse/FLINK-9336
             Project: Flink
          Issue Type: Bug
          Components: Queryable State
    Affects Versions: 1.5.0
            Reporter: Florian Schmidt


 

The following example can be used to reproduce the Exception:
 # Run an app with a FlatMap which exposes its MapState through Queryable State 
with RocksDB
 # Run a queryable state client: ✔️
 # Kill the TM
 # Start a new TM
 # Run a queryable state client: ❌ (StackTrace below)

 

This happens if we run our queryable state client after the new TM has started 
and the app is running but *before the FlatMap has processed elements again*

 

With a little help and some debugging I found out that very likely the reason 
is that in the _RocksDBMapState_ there is a private field _userKeyOffset,_ 
**which is initialised through the code path of 
_RocksDBMapState::serializeCurrentKeyAndNamespace. **_This will only be called 
when processing an element and therefore accessing the state. If now the state 
is accessed before that through queryable state, this value will be 0 and 
therefore the deserialization of the user key will fail as seen below. 

 

The observed stacktrace

 
{code:java}
Exception in thread "main" java.util.concurrent.ExecutionException: 
java.lang.RuntimeException: Failed request 0. Caused by: 
java.lang.RuntimeException: Failed request 0. Caused by: 
java.lang.RuntimeException: Error while processing request with ID 0. Caused 
by: java.lang.RuntimeException: Error while deserializing the user key. at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:414)
 at 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.serializeMap(KvStateSerializer.java:220)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.getSerializedValue(RocksDBMapState.java:288)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
 at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at 
java.io.DataInputStream.readFully(DataInputStream.java:197) at 
java.io.DataInputStream.readUTF(DataInputStream.java:609) at 
java.io.DataInputStream.readUTF(DataInputStream.java:564) at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:341)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.access$200(RocksDBMapState.java:61)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:412)
 ... 11 more at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
 at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
 at 
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
 at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at 
org.apache.flink.streaming.tests.queryablestate.QsStateClient.getMapState(QsStateClient.java:122)
 at 
org.apache.flink.streaming.tests.queryablestate.QsStateClient.main(QsStateClient.java:75)
 Caused by: java.lang.RuntimeException: Failed request 0. Caused by: 
java.lang.RuntimeException: Failed request 0. Caused by: 
java.lang.RuntimeException: Error while processing request with ID 0. Caused 
by: java.lang.RuntimeException: Error while deserializing the user key. at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:414)
 at 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.serializeMap(KvStateSerializer.java:220)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.getSerializedValue(RocksDBMapState.java:288)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
 at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at 
java.io.DataInputStream.readFully(DataInputStream.java:197) at 
java.io.DataInputStream.readUTF(DataInputStream.java:609) at 
java.io.DataInputStream.readUTF(DataInputStream.java:564) at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:341)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.access$200(RocksDBMapState.java:61)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:412)
 ... 11 more at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
 at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
 at 
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
 at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748)
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to