Hi Lei,

Which version of Flink would that be? I'm guessing >= 1.3.x? In Flink 1.1 the 
hash of an operator was tied to the parallelism but starting with 1.2 that 
shouldn't happen anymore.

Also, are you changing the parallelism job-wide or are there operators with 
differing parallelism? For example, could there be a source with parallelism 1 
and an operator that had parallelism 1 after that which now has a different 
parallelism?

Best,
Aljoscha

> On 16. Oct 2017, at 06:28, Lei Chen <ley...@gmail.com> wrote:
> 
> Hi, 
> 
> We're trying to implement some module to help autoscale our pipeline which is 
> built  with Flink on YARN. According to the document, the suggested procedure 
> seems to be:
> 
> 1. cancel job with savepoint
> 2. start new job with increased YARN TM number and parallelism. 
> 
> However, step 2 always gave error 
> 
> Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint 
> hdfs://10.106.238.14:/tmp/savepoint-767421-20907d234655. Cannot map savepoint 
> state for operator 37dfe905df17858e07858039ce3d8ae1 to the new program, 
> because the operator is not available in the new program. If you want to 
> allow to skip this, you can set the --allowNonRestoredState option on the CLI.
>       at 
> org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:130)
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1140)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1386)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
> The procedure worked fine if parallelism was not changed. 
> 
> Also want to mention that I didn't manually specify OperatorID in my job. The 
> document does mentioned manually OperatorID assignment is suggested, just 
> curious is that mandatory in my case to fix the problem I'm seeing, given 
> that my program doesn't change at all so the autogenerated operatorID should 
> be unchanged after parallelism increase?
> 
> thanks,
> Lei

Reply via email to