Re: Issue while restarting from SavePoint
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:320) >> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) >> at >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >> execution failed. >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) >> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> Caused by: org.apache.flink.runtime.execution.SuppressRestartsException: >> Unrecoverable failure. This suppresses job restarts. Please check the stack >> trace for the root cause. >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1305) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> ... 2 more >> Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint >> jobmanager://savepoints/1. Cannot map old state for task >> 4ca3b7c2641c4299a3378fab220c4e5c to the new program. This indicates that the >> program has been changed in a non-compatible way after the savepoint. >> at >> org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator.restoreSavepoint(SavepointCoordinator.java:248) >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraph.restoreSavepoint(ExecutionGraph.java:983) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1302) >> ... 10 more >> >> Thanks. >> >> From: Anirudh Mallem >> Date: Tuesday, October 18, 2016 at 2:07 PM >> To: "user@flink.apache.org" >> Subject: Issue while restarting from SavePoint >> >> Hi, >> I am relatively new to Flink and I was experimenting with the save points >> feature. I have an HA cluster running with 1 Master and 4 Workers. The >> flink-config.yaml is as follows : >> >> #== >> # Common >> #== >> jobmanager.rpc.address: >> stable-stream-master01.app.shared.int.sv2.247-inc.net >> >> # The port where the JobManager's main actor system listens for messages. >> jobmanager.rpc.port: 6123 >> >> # The heap size for the JobManager JVM >> jobmanager.heap.mb: 512 >> >> # The heap size for the TaskManager JVM >> taskmanager.heap.mb: 2048 >> >> # The number of task slots that each TaskManager o
Re: Issue while restarting from SavePoint
orkerThread.java:107) > Caused by: org.apache.flink.runtime.execution.SuppressRestartsException: > Unrecoverable failure. This suppresses job restarts. Please check the stack > trace for the root cause. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1305) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > ... 2 more > Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint > jobmanager://savepoints/1. Cannot map old state for task > 4ca3b7c2641c4299a3378fab220c4e5c to the new program. This indicates that the > program has been changed in a non-compatible way after the savepoint. > at > org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator.restoreSavepoint(SavepointCoordinator.java:248) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.restoreSavepoint(ExecutionGraph.java:983) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1302) > ... 10 more > > Thanks. > > From: Anirudh Mallem > Date: Tuesday, October 18, 2016 at 2:07 PM > To: "user@flink.apache.org" > Subject: Issue while restarting from SavePoint > > Hi, > I am relatively new to Flink and I was experimenting with the save points > feature. I have an HA cluster running with 1 Master and 4 Workers. The > flink-config.yaml is as follows : > > #== > # Common > #== > jobmanager.rpc.address: > stable-stream-master01.app.shared.int.sv2.247-inc.net > > # The port where the JobManager's main actor system listens for messages. > jobmanager.rpc.port: 6123 > > # The heap size for the JobManager JVM > jobmanager.heap.mb: 512 > > # The heap size for the TaskManager JVM > taskmanager.heap.mb: 2048 > > # The number of task slots that each TaskManager offers. Each slot runs one > parallel pipeline. > taskmanager.numberOfTaskSlots: 8 > > # Specify whether TaskManager memory should be allocated when starting up > (true) or when > # memory is required in the memory manager (false) > taskmanager.memory.preallocate: false > > # The parallelism used for programs that did not specify and other > parallelism. > parallelism.default: 1 > > env.java.home: /usr/local/java > > restart-strategy: fixed-delay > restart-strategy.fixed-delay.attempts: 2 > restart-strategy.fixed-delay.delay: 10 s > #== > # Web Frontend > #== > > # The port under which the web-based runtime monitor listens. > # A value of -1 deactivates the web server. > > jobmanager.web.port: 8081 > > # Flag to specify whether job submission is enabled from the web-based > # runtime monitor. Uncomment to disable. > > #jobmanager.web.submit.enable: false > > #== > # Streaming state checkpointing > #== > > # The backend that will be used to store operator state checkpoints if > # checkpointing is enabled. > # > # Supported backends: jobmanager, filesystem, > # > state.backend: filesystem > > > # Directory for storing checkpoints in a Flink-supported filesystem > # Note: State backend must be accessible from the JobManager and all > TaskManagers. > # Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file > systems, > # (or any local file system under Windows), or "S3://" for S3 file system. > # > state.backend.fs.checkpointdir: file:///home/amallem/ > state.savepoints.dir: file:///home/ama
Re: Issue while restarting from SavePoint
to rollback to savepoint jobmanager://savepoints/1. Cannot map old state for task 4ca3b7c2641c4299a3378fab220c4e5c to the new program. This indicates that the program has been changed in a non-compatible way after the savepoint. at org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator.restoreSavepoint(SavepointCoordinator.java:248) at org.apache.flink.runtime.executiongraph.ExecutionGraph.restoreSavepoint(ExecutionGraph.java:983) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1302) ... 10 more Thanks. From: Anirudh Mallem Date: Tuesday, October 18, 2016 at 2:07 PM To: "user@flink.apache.org<mailto:user@flink.apache.org>" Subject: Issue while restarting from SavePoint Hi, I am relatively new to Flink and I was experimenting with the save points feature. I have an HA cluster running with 1 Master and 4 Workers. The flink-config.yaml is as follows : #== # Common #== jobmanager.rpc.address: stable-stream-master01.app.shared.int.sv2.247-inc.net # The port where the JobManager's main actor system listens for messages. jobmanager.rpc.port: 6123 # The heap size for the JobManager JVM jobmanager.heap.mb: 512 # The heap size for the TaskManager JVM taskmanager.heap.mb: 2048 # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. taskmanager.numberOfTaskSlots: 8 # Specify whether TaskManager memory should be allocated when starting up (true) or when # memory is required in the memory manager (false) taskmanager.memory.preallocate: false # The parallelism used for programs that did not specify and other parallelism. parallelism.default: 1 env.java.home: /usr/local/java restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 2 restart-strategy.fixed-delay.delay: 10 s #== # Web Frontend #== # The port under which the web-based runtime monitor listens. # A value of -1 deactivates the web server. jobmanager.web.port: 8081 # Flag to specify whether job submission is enabled from the web-based # runtime monitor. Uncomment to disable. #jobmanager.web.submit.enable: false #== # Streaming state checkpointing #== # The backend that will be used to store operator state checkpoints if # checkpointing is enabled. # # Supported backends: jobmanager, filesystem, # state.backend: filesystem # Directory for storing checkpoints in a Flink-supported filesystem # Note: State backend must be accessible from the JobManager and all TaskManagers. # Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems, # (or any local file system under Windows), or "S3://" for S3 file system. # state.backend.fs.checkpointdir: file:///home/amallem/ state.savepoints.dir: file:///home/amallem/save/ #== # Master High Availability (required configuration) #== # The list of ZooKepper quorum peers that coordinate the high-availability # setup. This must be a list of the form: # "host1:clientPort,host2[:clientPort],..." (default clientPort: 2181) # recovery.mode: zookeeper # recovery.zookeeper.quorum: stable-stream-zookeeper01.app.shared.int.net:2181,stable-stream-zookeeper02.app.shared.int.net:2181 # # Note: You need to set the state backend to 'filesystem' and the checkpoint # directory (see above) before configuring the storageDir. # recovery.zookeeper.storageDir: file:///home/amallem/recovery recovery.zookeeper.path.root: /flink recovery.zookeeper.path.namespace: /cluster_one Query : I am able to take a save point and then cancel my current job but when I try to start the job again using the save point I get the error stating “ProgramInvocationException: JobManager did not respond within 6ms”. I checked Zookeeper and everything seems fine. I also followed the recommendations in the following post : http://stackoverflow.com/questions/36625742/cant-deploy-flow-to-ha-cluster-of-apache-flink-using-flink-cli Is there any configuration I am missing to enable restarting of the job. Any help will be appreciated. Thanks. Regards, Anirudh
Issue while restarting from SavePoint
Hi, I am relatively new to Flink and I was experimenting with the save points feature. I have an HA cluster running with 1 Master and 4 Workers. The flink-config.yaml is as follows : #== # Common #== jobmanager.rpc.address: stable-stream-master01.app.shared.int.sv2.247-inc.net # The port where the JobManager's main actor system listens for messages. jobmanager.rpc.port: 6123 # The heap size for the JobManager JVM jobmanager.heap.mb: 512 # The heap size for the TaskManager JVM taskmanager.heap.mb: 2048 # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. taskmanager.numberOfTaskSlots: 8 # Specify whether TaskManager memory should be allocated when starting up (true) or when # memory is required in the memory manager (false) taskmanager.memory.preallocate: false # The parallelism used for programs that did not specify and other parallelism. parallelism.default: 1 env.java.home: /usr/local/java restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 2 restart-strategy.fixed-delay.delay: 10 s #== # Web Frontend #== # The port under which the web-based runtime monitor listens. # A value of -1 deactivates the web server. jobmanager.web.port: 8081 # Flag to specify whether job submission is enabled from the web-based # runtime monitor. Uncomment to disable. #jobmanager.web.submit.enable: false #== # Streaming state checkpointing #== # The backend that will be used to store operator state checkpoints if # checkpointing is enabled. # # Supported backends: jobmanager, filesystem, # state.backend: filesystem # Directory for storing checkpoints in a Flink-supported filesystem # Note: State backend must be accessible from the JobManager and all TaskManagers. # Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems, # (or any local file system under Windows), or "S3://" for S3 file system. # state.backend.fs.checkpointdir: file:///home/amallem/ state.savepoints.dir: file:///home/amallem/save/ #== # Master High Availability (required configuration) #== # The list of ZooKepper quorum peers that coordinate the high-availability # setup. This must be a list of the form: # "host1:clientPort,host2[:clientPort],..." (default clientPort: 2181) # recovery.mode: zookeeper # recovery.zookeeper.quorum: stable-stream-zookeeper01.app.shared.int.net:2181,stable-stream-zookeeper02.app.shared.int.net:2181 # # Note: You need to set the state backend to 'filesystem' and the checkpoint # directory (see above) before configuring the storageDir. # recovery.zookeeper.storageDir: file:///home/amallem/recovery recovery.zookeeper.path.root: /flink recovery.zookeeper.path.namespace: /cluster_one Query : I am able to take a save point and then cancel my current job but when I try to start the job again using the save point I get the error stating “ProgramInvocationException: JobManager did not respond within 6ms”. I checked Zookeeper and everything seems fine. I also followed the recommendations in the following post : http://stackoverflow.com/questions/36625742/cant-deploy-flow-to-ha-cluster-of-apache-flink-using-flink-cli Is there any configuration I am missing to enable restarting of the job. Any help will be appreciated. Thanks. Regards, Anirudh