Hi Aljoscha, I'm using version 1.3.0 and changing job-wide parallelism.
Lei On Thu, Oct 19, 2017 at 9:47 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi Lei, > > Which version of Flink would that be? I'm guessing >= 1.3.x? In Flink 1.1 > the hash of an operator was tied to the parallelism but starting with 1.2 > that shouldn't happen anymore. > > Also, are you changing the parallelism job-wide or are there operators > with differing parallelism? For example, could there be a source with > parallelism 1 and an operator that had parallelism 1 after that which now > has a different parallelism? > > Best, > Aljoscha > > > On 16. Oct 2017, at 06:28, Lei Chen <ley...@gmail.com> wrote: > > Hi, > > We're trying to implement some module to help autoscale our pipeline which > is built with Flink on YARN. According to the document, the suggested > procedure seems to be: > > 1. cancel job with savepoint > 2. start new job with increased YARN TM number and parallelism. > > However, step 2 always gave error > > Caused by: java.lang.IllegalStateException: Failed to rollback to > savepoint hdfs://10.106.238.14:/tmp/savepoint-767421-20907d234655. Cannot > map savepoint state for operator 37dfe905df17858e07858039ce3d8ae1 to the > new program, because the operator is not available in the new program. If > you want to allow to skip this, you can set the --allowNonRestoredState > option on the CLI. > at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoade > r.loadAndValidateSavepoint(SavepointLoader.java:130) > at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.re > storeSavepoint(CheckpointCoordinator.java:1140) > at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$ > apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobMa > nager.scala:1386) > at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$ > apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.s > cala:1372) > at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$ > apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.s > cala:1372) > at scala.concurrent.impl.Future$PromiseCompletingRunnable.lifte > dTree1$1(Future.scala:24) > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(F > uture.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask. > exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask( > ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo > l.java:1979) > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW > orkerThread.java:107) > > The procedure worked fine if parallelism was not changed. > > Also want to mention that I didn't manually specify OperatorID in my job. The > document does mentioned manually OperatorID assignment is suggested, just > curious is that mandatory in my case to fix the problem I'm seeing, given > that my program doesn't change at all so the autogenerated operatorID > should be unchanged after parallelism increase? > > thanks, > Lei > > >