[jira] [Commented] (FLINK-16470) Network failure causes Checkpoint Coordinator to flood disk with exceptions
[ https://issues.apache.org/jira/browse/FLINK-16470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17056583#comment-17056583 ] Jason Kania commented on FLINK-16470: - [~gjy], the restart strategy was just the default and had no delay so that explains the rapid restart. I did not understand that the restart strategy was related to this interaction with the Zookeeper and the RegionStrategy. Thanks for connecting the two. I will try with a restart strategy and see what happens another time if there is a similar failure. > Network failure causes Checkpoint Coordinator to flood disk with exceptions > --- > > Key: FLINK-16470 > URL: https://issues.apache.org/jira/browse/FLINK-16470 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Coordination >Affects Versions: 1.9.2 > Environment: Latest patch current Ubuntu release with latest java 8 > JRE. >Reporter: Jason Kania >Priority: Major > > When a networking error occurred that prevented access to the shared folder > mounted over NFS, the CheckpointCoordinator flooded the logs with the > following: > > {{org.apache.flink.util.FlinkException: Could not retrieve checkpoint 158365 > from state handle under /0158365. This indicates that the > retrieved state handle is broken. Try cleaning the state handle store.}} > {{ at > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.retrieveCompletedCheckpoint(ZooKeeperCompletedCheckpointStore.java:345)}} > {{ at > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.recover(ZooKeeperCompletedCheckpointStore.java:175)}} > {{ at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1014)}} > {{ at > org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG.resetTasks(AdaptedRestartPipelinedRegionStrategyNG.java:205)}} > {{ at > org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG.lambda$createResetAndRescheduleTasksCallback$1(AdaptedRestartPipelinedRegionStrategyNG.java:149)}} > {{ at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$scheduleWithDelay$3(FutureUtils.java:202)}} > {{ at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$scheduleWithDelay$4(FutureUtils.java:226)}} > {{ at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)}} > {{ at java.util.concurrent.FutureTask.run(FutureTask.java:266)}} > {{ at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)}} > {{ at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)}} > {{ at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)}} > {{ at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)}} > {{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)}} > {{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)}} > {{ at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)}} > {{ at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)}} > {{ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)}} > {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)}} > {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)}} > {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)}} > {{ at akka.actor.Actor.aroundReceive(Actor.scala:517)}} > {{ at akka.actor.Actor.aroundReceive$(Actor.scala:515)}} > {{ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)}} > {{ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)}} > {{ at akka.actor.ActorCell.invoke(ActorCell.scala:561)}} > {{ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)}} > {{ at akka.dispatch.Mailbox.run(Mailbox.scala:225)}} > {{ at akka.dispatch.Mailbox.exec(Mailbox.scala:235)}} > {{ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)}} > {{ at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)}} > {{ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)}} > {{ at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)}} > {{Caused by: java.io.FileNotFoundException: > /mnt/shared/completedCheckpoint53ed9d9197f7 (No such file or directory)}} > {{ at java.io.FileInputStream.open0(Native Method)}} > {{ at java.io.FileInputStream.open(FileInputStream.java:195)}} > {{ at java.io.FileInputStream.(FileInputStream.java:138)}} > {{ at > org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)}} > {{ at > org.a
[jira] [Commented] (FLINK-16470) Network failure causes Checkpoint Coordinator to flood disk with exceptions
[ https://issues.apache.org/jira/browse/FLINK-16470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17056142#comment-17056142 ] Gary Yao commented on FLINK-16470: -- I should add that the default delay in Flink 1.9 is 0s. This was changed in 1.10, see https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html#increase-of-default-restart-delay-flink-13884 > Network failure causes Checkpoint Coordinator to flood disk with exceptions > --- > > Key: FLINK-16470 > URL: https://issues.apache.org/jira/browse/FLINK-16470 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Coordination >Affects Versions: 1.9.2 > Environment: Latest patch current Ubuntu release with latest java 8 > JRE. >Reporter: Jason Kania >Priority: Major > > When a networking error occurred that prevented access to the shared folder > mounted over NFS, the CheckpointCoordinator flooded the logs with the > following: > > {{org.apache.flink.util.FlinkException: Could not retrieve checkpoint 158365 > from state handle under /0158365. This indicates that the > retrieved state handle is broken. Try cleaning the state handle store.}} > {{ at > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.retrieveCompletedCheckpoint(ZooKeeperCompletedCheckpointStore.java:345)}} > {{ at > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.recover(ZooKeeperCompletedCheckpointStore.java:175)}} > {{ at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1014)}} > {{ at > org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG.resetTasks(AdaptedRestartPipelinedRegionStrategyNG.java:205)}} > {{ at > org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG.lambda$createResetAndRescheduleTasksCallback$1(AdaptedRestartPipelinedRegionStrategyNG.java:149)}} > {{ at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$scheduleWithDelay$3(FutureUtils.java:202)}} > {{ at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$scheduleWithDelay$4(FutureUtils.java:226)}} > {{ at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)}} > {{ at java.util.concurrent.FutureTask.run(FutureTask.java:266)}} > {{ at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)}} > {{ at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)}} > {{ at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)}} > {{ at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)}} > {{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)}} > {{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)}} > {{ at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)}} > {{ at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)}} > {{ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)}} > {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)}} > {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)}} > {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)}} > {{ at akka.actor.Actor.aroundReceive(Actor.scala:517)}} > {{ at akka.actor.Actor.aroundReceive$(Actor.scala:515)}} > {{ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)}} > {{ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)}} > {{ at akka.actor.ActorCell.invoke(ActorCell.scala:561)}} > {{ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)}} > {{ at akka.dispatch.Mailbox.run(Mailbox.scala:225)}} > {{ at akka.dispatch.Mailbox.exec(Mailbox.scala:235)}} > {{ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)}} > {{ at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)}} > {{ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)}} > {{ at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)}} > {{Caused by: java.io.FileNotFoundException: > /mnt/shared/completedCheckpoint53ed9d9197f7 (No such file or directory)}} > {{ at java.io.FileInputStream.open0(Native Method)}} > {{ at java.io.FileInputStream.open(FileInputStream.java:195)}} > {{ at java.io.FileInputStream.(FileInputStream.java:138)}} > {{ at > org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)}} > {{ at > org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)}} > {{ at > org.apache.flink.runtime.state.filesystem.FileStateHand
[jira] [Commented] (FLINK-16470) Network failure causes Checkpoint Coordinator to flood disk with exceptions
[ https://issues.apache.org/jira/browse/FLINK-16470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17056058#comment-17056058 ] Gary Yao commented on FLINK-16470: -- [~longtimer] Can you post your restart strategy configuration? Did you configure a delay in your restart strategy? See https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/task_failure_recovery.html#restart-strategies > Network failure causes Checkpoint Coordinator to flood disk with exceptions > --- > > Key: FLINK-16470 > URL: https://issues.apache.org/jira/browse/FLINK-16470 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Coordination >Affects Versions: 1.9.2 > Environment: Latest patch current Ubuntu release with latest java 8 > JRE. >Reporter: Jason Kania >Priority: Major > > When a networking error occurred that prevented access to the shared folder > mounted over NFS, the CheckpointCoordinator flooded the logs with the > following: > > {{org.apache.flink.util.FlinkException: Could not retrieve checkpoint 158365 > from state handle under /0158365. This indicates that the > retrieved state handle is broken. Try cleaning the state handle store.}} > {{ at > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.retrieveCompletedCheckpoint(ZooKeeperCompletedCheckpointStore.java:345)}} > {{ at > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.recover(ZooKeeperCompletedCheckpointStore.java:175)}} > {{ at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1014)}} > {{ at > org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG.resetTasks(AdaptedRestartPipelinedRegionStrategyNG.java:205)}} > {{ at > org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG.lambda$createResetAndRescheduleTasksCallback$1(AdaptedRestartPipelinedRegionStrategyNG.java:149)}} > {{ at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$scheduleWithDelay$3(FutureUtils.java:202)}} > {{ at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$scheduleWithDelay$4(FutureUtils.java:226)}} > {{ at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)}} > {{ at java.util.concurrent.FutureTask.run(FutureTask.java:266)}} > {{ at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)}} > {{ at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)}} > {{ at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)}} > {{ at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)}} > {{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)}} > {{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)}} > {{ at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)}} > {{ at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)}} > {{ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)}} > {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)}} > {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)}} > {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)}} > {{ at akka.actor.Actor.aroundReceive(Actor.scala:517)}} > {{ at akka.actor.Actor.aroundReceive$(Actor.scala:515)}} > {{ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)}} > {{ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)}} > {{ at akka.actor.ActorCell.invoke(ActorCell.scala:561)}} > {{ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)}} > {{ at akka.dispatch.Mailbox.run(Mailbox.scala:225)}} > {{ at akka.dispatch.Mailbox.exec(Mailbox.scala:235)}} > {{ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)}} > {{ at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)}} > {{ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)}} > {{ at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)}} > {{Caused by: java.io.FileNotFoundException: > /mnt/shared/completedCheckpoint53ed9d9197f7 (No such file or directory)}} > {{ at java.io.FileInputStream.open0(Native Method)}} > {{ at java.io.FileInputStream.open(FileInputStream.java:195)}} > {{ at java.io.FileInputStream.(FileInputStream.java:138)}} > {{ at > org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)}} > {{ at > org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)}} > {{ at > org.apache.flink.runtime.state.filesystem.FileSta