Hi Lei, Which version of Flink would that be? I'm guessing >= 1.3.x? In Flink 1.1 the hash of an operator was tied to the parallelism but starting with 1.2 that shouldn't happen anymore.
Also, are you changing the parallelism job-wide or are there operators with differing parallelism? For example, could there be a source with parallelism 1 and an operator that had parallelism 1 after that which now has a different parallelism? Best, Aljoscha > On 16. Oct 2017, at 06:28, Lei Chen <ley...@gmail.com> wrote: > > Hi, > > We're trying to implement some module to help autoscale our pipeline which is > built with Flink on YARN. According to the document, the suggested procedure > seems to be: > > 1. cancel job with savepoint > 2. start new job with increased YARN TM number and parallelism. > > However, step 2 always gave error > > Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint > hdfs://10.106.238.14:/tmp/savepoint-767421-20907d234655. Cannot map savepoint > state for operator 37dfe905df17858e07858039ce3d8ae1 to the new program, > because the operator is not available in the new program. If you want to > allow to skip this, you can set the --allowNonRestoredState option on the CLI. > at > org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:130) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1140) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1386) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > The procedure worked fine if parallelism was not changed. > > Also want to mention that I didn't manually specify OperatorID in my job. The > document does mentioned manually OperatorID assignment is suggested, just > curious is that mandatory in my case to fix the problem I'm seeing, given > that my program doesn't change at all so the autogenerated operatorID should > be unchanged after parallelism increase? > > thanks, > Lei