Re: Job fails to restore from checkpoint in Kubernetes with FileNotFoundException
As Vino pointed out, you need to configure a checkpoint directory which is accessible from all TMs. Otherwise you won't be able to recover the state if the task gets scheduled to a different TaskManager. Usually, people use HDFS or S3 for that. Cheers, Till On Tue, Oct 30, 2018 at 9:50 AM vino yang wrote: > Hi John, > > Is the file system configured by RocksDBStateBackend HDFS?[1] > > Thanks, vino. > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/state_backends.html#the-rocksdbstatebackend > > John Stone 于2018年10月30日周二 上午2:54写道: > >> I am testing Flink in a Kubernetes cluster and am finding that a job gets >> caught in a recovery loop. Logs show that the issue is that a checkpoint >> cannot be found although checkpoints are being taken per the Flink web UI. >> Any advice on how to resolve this is most appreciated. >> >> Note on below: I can easily replicate this with a single TaskManager (>1 >> slots) and a job parallelism of 1, or two TaskManagers (4 slots each) and a >> job parallelism of 8. >> >> Setup: >> - Flink 1.6.0 >> - Kubernetes cluster. >> - 1 JobManager node, 2 TaskManager nodes. >> - RocksDB backend with incremental checkpointing. >> - There is not a persistent volume mounted on any of the three nodes. In >> production, we would obviously need a persistent volume on the JobManager. >> - Job submitted and running such that the job is parallelized over both >> nodes (i.e. each TM has 4 task slots; job parallelism = 5). >> >> Test: >> - Let the job collect a few checkpoints, say, 9 checkpoints. >> - Kill one of the two TMs (kubectl delete pods ). >> - New TM pod starts. >> >> Result: >> - After the new TM starts, the job will cycle through FAILING -> RUNNING >> -> FAILING -> RUNNING ->... >> >> Relevant Information >> Contents of JobManager's pod: >> user@host:~$ kubectl exec -it flink-jobmanager-5bbfcb567-v299g -- /bin/sh >> # ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28 >> chk-9 shared taskowned >> # ls >> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9 >> _metadata >> # ls >> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata >> >> flink-data/dwell-sliding-window-demo/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata >> # ls -al >> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata >> -rw-r--r-- 1 flink flink 23665 Oct 29 18:07 >> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata >> # ls >> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/taskowned >> # ls >> flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/shared >> >> Stacktrace >> A similar message is repeated over and over in the logs: >> - "Could not restore operator state backend for >> CoBroadcastWithNonKeyedOperator_b0eed879993a32985f1fde75e55fe3e3_(5/5)" >> - "Could not restore operator state backend for >> WindowOperator_31dd93ebcd1f26006d3b41a7b50b5d82_(3/5)" >> - "Could not restore operator state backend for >> StreamSource_ab0f3d44654e8df0b68f0b30a956403c_(2/5)" >> >> 2018-10-29 18:12:48,965 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job >> (f786f0c2e3a4405fe81a1eed720d5c28) switched from state RUNNING to FAILING. >> java.lang.Exception: Exception while creating StreamOperatorStateContext. >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: org.apache.flink.util.FlinkException: Could not restore >> operator state backend for >> CoBroadcastWithNonKeyedOperator_b0eed879993a32985f1fde75e55fe3e3_(5/5) from >> any of the 1 provided restore options. >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242) >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140) >> ... 5 more >> Caused by: java.io.FileNotFoundException: >> /opt/flink/flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/b8865e25-3761-4ddf-a466-f035b639184b >> (No such file or directory) >> at java.io.FileInputStream.open0(Native Method) >> at java.io.FileInputStream.open(FileInputStream.java:195
Re: Job fails to restore from checkpoint in Kubernetes with FileNotFoundException
Hi John, Is the file system configured by RocksDBStateBackend HDFS?[1] Thanks, vino. [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/state_backends.html#the-rocksdbstatebackend John Stone 于2018年10月30日周二 上午2:54写道: > I am testing Flink in a Kubernetes cluster and am finding that a job gets > caught in a recovery loop. Logs show that the issue is that a checkpoint > cannot be found although checkpoints are being taken per the Flink web UI. > Any advice on how to resolve this is most appreciated. > > Note on below: I can easily replicate this with a single TaskManager (>1 > slots) and a job parallelism of 1, or two TaskManagers (4 slots each) and a > job parallelism of 8. > > Setup: > - Flink 1.6.0 > - Kubernetes cluster. > - 1 JobManager node, 2 TaskManager nodes. > - RocksDB backend with incremental checkpointing. > - There is not a persistent volume mounted on any of the three nodes. In > production, we would obviously need a persistent volume on the JobManager. > - Job submitted and running such that the job is parallelized over both > nodes (i.e. each TM has 4 task slots; job parallelism = 5). > > Test: > - Let the job collect a few checkpoints, say, 9 checkpoints. > - Kill one of the two TMs (kubectl delete pods ). > - New TM pod starts. > > Result: > - After the new TM starts, the job will cycle through FAILING -> RUNNING > -> FAILING -> RUNNING ->... > > Relevant Information > Contents of JobManager's pod: > user@host:~$ kubectl exec -it flink-jobmanager-5bbfcb567-v299g -- /bin/sh > # ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28 > chk-9 shared taskowned > # ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9 > _metadata > # ls > flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata > > flink-data/dwell-sliding-window-demo/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata > # ls -al > flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata > -rw-r--r-- 1 flink flink 23665 Oct 29 18:07 > flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata > # ls > flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/taskowned > # ls > flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/shared > > Stacktrace > A similar message is repeated over and over in the logs: > - "Could not restore operator state backend for > CoBroadcastWithNonKeyedOperator_b0eed879993a32985f1fde75e55fe3e3_(5/5)" > - "Could not restore operator state backend for > WindowOperator_31dd93ebcd1f26006d3b41a7b50b5d82_(3/5)" > - "Could not restore operator state backend for > StreamSource_ab0f3d44654e8df0b68f0b30a956403c_(2/5)" > > 2018-10-29 18:12:48,965 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Job > (f786f0c2e3a4405fe81a1eed720d5c28) switched from state RUNNING to FAILING. > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkException: Could not restore > operator state backend for > CoBroadcastWithNonKeyedOperator_b0eed879993a32985f1fde75e55fe3e3_(5/5) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140) > ... 5 more > Caused by: java.io.FileNotFoundException: > /opt/flink/flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/b8865e25-3761-4ddf-a466-f035b639184b > (No such file or directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.(FileInputStream.java:138) > at > org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50) > at > org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85) > at > org.apache.flink.runtime.state.filesys
Job fails to restore from checkpoint in Kubernetes with FileNotFoundException
I am testing Flink in a Kubernetes cluster and am finding that a job gets caught in a recovery loop. Logs show that the issue is that a checkpoint cannot be found although checkpoints are being taken per the Flink web UI. Any advice on how to resolve this is most appreciated. Note on below: I can easily replicate this with a single TaskManager (>1 slots) and a job parallelism of 1, or two TaskManagers (4 slots each) and a job parallelism of 8. Setup: - Flink 1.6.0 - Kubernetes cluster. - 1 JobManager node, 2 TaskManager nodes. - RocksDB backend with incremental checkpointing. - There is not a persistent volume mounted on any of the three nodes. In production, we would obviously need a persistent volume on the JobManager. - Job submitted and running such that the job is parallelized over both nodes (i.e. each TM has 4 task slots; job parallelism = 5). Test: - Let the job collect a few checkpoints, say, 9 checkpoints. - Kill one of the two TMs (kubectl delete pods ). - New TM pod starts. Result: - After the new TM starts, the job will cycle through FAILING -> RUNNING -> FAILING -> RUNNING ->... Relevant Information Contents of JobManager's pod: user@host:~$ kubectl exec -it flink-jobmanager-5bbfcb567-v299g -- /bin/sh # ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28 chk-9 shared taskowned # ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9 _metadata # ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata flink-data/dwell-sliding-window-demo/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata # ls -al flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata -rw-r--r-- 1 flink flink 23665 Oct 29 18:07 flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/_metadata # ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/taskowned # ls flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/shared Stacktrace A similar message is repeated over and over in the logs: - "Could not restore operator state backend for CoBroadcastWithNonKeyedOperator_b0eed879993a32985f1fde75e55fe3e3_(5/5)" - "Could not restore operator state backend for WindowOperator_31dd93ebcd1f26006d3b41a7b50b5d82_(3/5)" - "Could not restore operator state backend for StreamSource_ab0f3d44654e8df0b68f0b30a956403c_(2/5)" 2018-10-29 18:12:48,965 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Job (f786f0c2e3a4405fe81a1eed720d5c28) switched from state RUNNING to FAILING. java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for CoBroadcastWithNonKeyedOperator_b0eed879993a32985f1fde75e55fe3e3_(5/5) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140) ... 5 more Caused by: java.io.FileNotFoundException: /opt/flink/flink-data/test-job/checkpoints/f786f0c2e3a4405fe81a1eed720d5c28/chk-9/b8865e25-3761-4ddf-a466-f035b639184b (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.(FileInputStream.java:138) at org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50) at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85) at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68) at org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:495) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStat