I am testing Flink in a Kubernetes cluster and am finding that a job gets 
caught in a recovery loop.  Logs show that the issue is that a checkpoint 
cannot be found although checkpoints are being taken per the Flink web UI.  Any 
advice on how to resolve this is most appreciated.

Note on below: I can easily replicate this with a single TaskManager (>1 slots) 
and a job parallelism of 1, or two TaskManagers (4 slots each) and a job 
parallelism of 8.

Setup:
- Flink 1.6.0
- Kubernetes cluster.
- 1 JobManager node, 2 TaskManager nodes. 
- RocksDB backend with incremental checkpointing.
- There is not a persistent volume mounted on any of the three nodes.  In 
production, we would obviously need a persistent volume on the JobManager.
- Job submitted and running such that the job is parallelized over both nodes 
(i.e. each TM has 4 task slots; job parallelism = 5).

Test:
- Let the job collect a few checkpoints, say, 9 checkpoints.
- Kill one of the two TMs (kubectl delete pods <pod-name>).
- New TM pod starts.

Result:
- After the new TM starts, the job will cycle through FAILING -> RUNNING -> 
FAILING -> RUNNING ->...

Relevant Information
Contents of JobManager's pod:
user@host:~$ kubectl exec -it flink-jobmanager-5bbfcb567-v299g -- /bin/sh
# ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28
chk-9  shared  taskowned
# ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9
_metadata
# ls 
flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
flink-data/dwell-sliding-window-demo/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
# ls -al 
flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
-rw-r--r-- 1 flink flink 23665 Oct 29 18:07 
flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata
# ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/taskowned
# ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/shared

Stacktrace
A similar message is repeated over and over in the logs:
- "Could not restore operator state backend for 
CoBroadcastWithNonKeyedOperator_b0eed879993a32985f1fde75e55fe3e3_(5/5)"
- "Could not restore operator state backend for 
WindowOperator_31dd93ebcd1f26006d3b41a7b50b5d82_(3/5)"
- "Could not restore operator state backend for 
StreamSource_ab0f3d44654e8df0b68f0b30a956403c_(2/5)"

2018-10-29 18:12:48,965 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job 
(f786f0c2e3a4405fe81a1eed720d5c28) switched from state RUNNING to FAILING.
java.lang.Exception: Exception while creating StreamOperatorStateContext.
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for 
CoBroadcastWithNonKeyedOperator_b0eed879993a32985f1fde75e55fe3e3_(5/5) from any 
of the 1 provided restore options.
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
        ... 5 more
Caused by: java.io.FileNotFoundException: 
/opt/flink/flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/b8865e25-3761-4ddf-a466-f035b639184b
 (No such file or directory)
        at java.io.FileInputStream.open0(Native Method)
        at java.io.FileInputStream.open(FileInputStream.java:195)
        at java.io.FileInputStream.<init>(FileInputStream.java:138)
        at 
org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
        at 
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
        at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
        at 
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
        at 
org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66)
        at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:495)
        at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
        ... 7 more

Reply via email to