[jira] [Commented] (FLINK-16470) Network failure causes Checkpoint Coordinator to flood disk with exceptions

2020-03-10 Thread Jason Kania (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17056583#comment-17056583
 ] 

Jason Kania commented on FLINK-16470:
-

[~gjy], the restart strategy was just the default and had no delay so that 
explains the rapid restart. I did not understand that the restart strategy was 
related to this interaction with the Zookeeper and the RegionStrategy. Thanks 
for connecting the two. I will try with a restart strategy and see what happens 
another time if there is a similar failure.

> Network failure causes Checkpoint Coordinator to flood disk with exceptions
> ---
>
> Key: FLINK-16470
> URL: https://issues.apache.org/jira/browse/FLINK-16470
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.9.2
> Environment: Latest patch current Ubuntu release with latest java 8 
> JRE.
>Reporter: Jason Kania
>Priority: Major
>
> When a networking error occurred that prevented access to the shared folder 
> mounted over NFS, the CheckpointCoordinator flooded the logs with the 
> following:
>  
> {{org.apache.flink.util.FlinkException: Could not retrieve checkpoint 158365 
> from state handle under /0158365. This indicates that the 
> retrieved state handle is broken. Try cleaning the state handle store.}}
> {{ at 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.retrieveCompletedCheckpoint(ZooKeeperCompletedCheckpointStore.java:345)}}
> {{ at 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.recover(ZooKeeperCompletedCheckpointStore.java:175)}}
> {{ at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1014)}}
> {{ at 
> org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG.resetTasks(AdaptedRestartPipelinedRegionStrategyNG.java:205)}}
> {{ at 
> org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG.lambda$createResetAndRescheduleTasksCallback$1(AdaptedRestartPipelinedRegionStrategyNG.java:149)}}
> {{ at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$scheduleWithDelay$3(FutureUtils.java:202)}}
> {{ at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$scheduleWithDelay$4(FutureUtils.java:226)}}
> {{ at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)}}
> {{ at java.util.concurrent.FutureTask.run(FutureTask.java:266)}}
> {{ at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)}}
> {{ at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)}}
> {{ at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)}}
> {{ at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)}}
> {{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)}}
> {{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)}}
> {{ at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)}}
> {{ at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)}}
> {{ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)}}
> {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)}}
> {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)}}
> {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)}}
> {{ at akka.actor.Actor.aroundReceive(Actor.scala:517)}}
> {{ at akka.actor.Actor.aroundReceive$(Actor.scala:515)}}
> {{ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)}}
> {{ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)}}
> {{ at akka.actor.ActorCell.invoke(ActorCell.scala:561)}}
> {{ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)}}
> {{ at akka.dispatch.Mailbox.run(Mailbox.scala:225)}}
> {{ at akka.dispatch.Mailbox.exec(Mailbox.scala:235)}}
> {{ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)}}
> {{ at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)}}
> {{ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)}}
> {{ at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)}}
> {{Caused by: java.io.FileNotFoundException: 
> /mnt/shared/completedCheckpoint53ed9d9197f7 (No such file or directory)}}
> {{ at java.io.FileInputStream.open0(Native Method)}}
> {{ at java.io.FileInputStream.open(FileInputStream.java:195)}}
> {{ at java.io.FileInputStream.(FileInputStream.java:138)}}
> {{ at 
> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)}}
> {{ at 
> org.a

[jira] [Commented] (FLINK-16470) Network failure causes Checkpoint Coordinator to flood disk with exceptions

2020-03-10 Thread Gary Yao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17056142#comment-17056142
 ] 

Gary Yao commented on FLINK-16470:
--

I should add that the default delay in Flink 1.9 is 0s. This was changed in 
1.10, see 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html#increase-of-default-restart-delay-flink-13884

> Network failure causes Checkpoint Coordinator to flood disk with exceptions
> ---
>
> Key: FLINK-16470
> URL: https://issues.apache.org/jira/browse/FLINK-16470
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.9.2
> Environment: Latest patch current Ubuntu release with latest java 8 
> JRE.
>Reporter: Jason Kania
>Priority: Major
>
> When a networking error occurred that prevented access to the shared folder 
> mounted over NFS, the CheckpointCoordinator flooded the logs with the 
> following:
>  
> {{org.apache.flink.util.FlinkException: Could not retrieve checkpoint 158365 
> from state handle under /0158365. This indicates that the 
> retrieved state handle is broken. Try cleaning the state handle store.}}
> {{ at 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.retrieveCompletedCheckpoint(ZooKeeperCompletedCheckpointStore.java:345)}}
> {{ at 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.recover(ZooKeeperCompletedCheckpointStore.java:175)}}
> {{ at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1014)}}
> {{ at 
> org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG.resetTasks(AdaptedRestartPipelinedRegionStrategyNG.java:205)}}
> {{ at 
> org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG.lambda$createResetAndRescheduleTasksCallback$1(AdaptedRestartPipelinedRegionStrategyNG.java:149)}}
> {{ at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$scheduleWithDelay$3(FutureUtils.java:202)}}
> {{ at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$scheduleWithDelay$4(FutureUtils.java:226)}}
> {{ at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)}}
> {{ at java.util.concurrent.FutureTask.run(FutureTask.java:266)}}
> {{ at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)}}
> {{ at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)}}
> {{ at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)}}
> {{ at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)}}
> {{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)}}
> {{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)}}
> {{ at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)}}
> {{ at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)}}
> {{ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)}}
> {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)}}
> {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)}}
> {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)}}
> {{ at akka.actor.Actor.aroundReceive(Actor.scala:517)}}
> {{ at akka.actor.Actor.aroundReceive$(Actor.scala:515)}}
> {{ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)}}
> {{ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)}}
> {{ at akka.actor.ActorCell.invoke(ActorCell.scala:561)}}
> {{ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)}}
> {{ at akka.dispatch.Mailbox.run(Mailbox.scala:225)}}
> {{ at akka.dispatch.Mailbox.exec(Mailbox.scala:235)}}
> {{ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)}}
> {{ at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)}}
> {{ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)}}
> {{ at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)}}
> {{Caused by: java.io.FileNotFoundException: 
> /mnt/shared/completedCheckpoint53ed9d9197f7 (No such file or directory)}}
> {{ at java.io.FileInputStream.open0(Native Method)}}
> {{ at java.io.FileInputStream.open(FileInputStream.java:195)}}
> {{ at java.io.FileInputStream.(FileInputStream.java:138)}}
> {{ at 
> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)}}
> {{ at 
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)}}
> {{ at 
> org.apache.flink.runtime.state.filesystem.FileStateHand

[jira] [Commented] (FLINK-16470) Network failure causes Checkpoint Coordinator to flood disk with exceptions

2020-03-10 Thread Gary Yao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17056058#comment-17056058
 ] 

Gary Yao commented on FLINK-16470:
--

[~longtimer] Can you post your restart strategy configuration? Did you 
configure a delay in your restart strategy? See 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/task_failure_recovery.html#restart-strategies

> Network failure causes Checkpoint Coordinator to flood disk with exceptions
> ---
>
> Key: FLINK-16470
> URL: https://issues.apache.org/jira/browse/FLINK-16470
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.9.2
> Environment: Latest patch current Ubuntu release with latest java 8 
> JRE.
>Reporter: Jason Kania
>Priority: Major
>
> When a networking error occurred that prevented access to the shared folder 
> mounted over NFS, the CheckpointCoordinator flooded the logs with the 
> following:
>  
> {{org.apache.flink.util.FlinkException: Could not retrieve checkpoint 158365 
> from state handle under /0158365. This indicates that the 
> retrieved state handle is broken. Try cleaning the state handle store.}}
> {{ at 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.retrieveCompletedCheckpoint(ZooKeeperCompletedCheckpointStore.java:345)}}
> {{ at 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.recover(ZooKeeperCompletedCheckpointStore.java:175)}}
> {{ at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1014)}}
> {{ at 
> org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG.resetTasks(AdaptedRestartPipelinedRegionStrategyNG.java:205)}}
> {{ at 
> org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG.lambda$createResetAndRescheduleTasksCallback$1(AdaptedRestartPipelinedRegionStrategyNG.java:149)}}
> {{ at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$scheduleWithDelay$3(FutureUtils.java:202)}}
> {{ at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$scheduleWithDelay$4(FutureUtils.java:226)}}
> {{ at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)}}
> {{ at java.util.concurrent.FutureTask.run(FutureTask.java:266)}}
> {{ at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)}}
> {{ at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)}}
> {{ at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)}}
> {{ at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)}}
> {{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)}}
> {{ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)}}
> {{ at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)}}
> {{ at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)}}
> {{ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)}}
> {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)}}
> {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)}}
> {{ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)}}
> {{ at akka.actor.Actor.aroundReceive(Actor.scala:517)}}
> {{ at akka.actor.Actor.aroundReceive$(Actor.scala:515)}}
> {{ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)}}
> {{ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)}}
> {{ at akka.actor.ActorCell.invoke(ActorCell.scala:561)}}
> {{ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)}}
> {{ at akka.dispatch.Mailbox.run(Mailbox.scala:225)}}
> {{ at akka.dispatch.Mailbox.exec(Mailbox.scala:235)}}
> {{ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)}}
> {{ at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)}}
> {{ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)}}
> {{ at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)}}
> {{Caused by: java.io.FileNotFoundException: 
> /mnt/shared/completedCheckpoint53ed9d9197f7 (No such file or directory)}}
> {{ at java.io.FileInputStream.open0(Native Method)}}
> {{ at java.io.FileInputStream.open(FileInputStream.java:195)}}
> {{ at java.io.FileInputStream.(FileInputStream.java:138)}}
> {{ at 
> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)}}
> {{ at 
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)}}
> {{ at 
> org.apache.flink.runtime.state.filesystem.FileSta