Hi Aljoscha,

I'm using version 1.3.0 and changing job-wide parallelism.

Lei

On Thu, Oct 19, 2017 at 9:47 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi Lei,
>
> Which version of Flink would that be? I'm guessing >= 1.3.x? In Flink 1.1
> the hash of an operator was tied to the parallelism but starting with 1.2
> that shouldn't happen anymore.
>
> Also, are you changing the parallelism job-wide or are there operators
> with differing parallelism? For example, could there be a source with
> parallelism 1 and an operator that had parallelism 1 after that which now
> has a different parallelism?
>
> Best,
> Aljoscha
>
>
> On 16. Oct 2017, at 06:28, Lei Chen <ley...@gmail.com> wrote:
>
> Hi,
>
> We're trying to implement some module to help autoscale our pipeline which
> is built  with Flink on YARN. According to the document, the suggested
> procedure seems to be:
>
> 1. cancel job with savepoint
> 2. start new job with increased YARN TM number and parallelism.
>
> However, step 2 always gave error
>
> Caused by: java.lang.IllegalStateException: Failed to rollback to
> savepoint hdfs://10.106.238.14:/tmp/savepoint-767421-20907d234655. Cannot
> map savepoint state for operator 37dfe905df17858e07858039ce3d8ae1 to the
> new program, because the operator is not available in the new program. If
> you want to allow to skip this, you can set the --allowNonRestoredState
> option on the CLI.
> at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoade
> r.loadAndValidateSavepoint(SavepointLoader.java:130)
> at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.re
> storeSavepoint(CheckpointCoordinator.java:1140)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobMa
> nager.scala:1386)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.s
> cala:1372)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.s
> cala:1372)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.lifte
> dTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(F
> uture.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.
> exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
> ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
> l.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
> orkerThread.java:107)
>
> The procedure worked fine if parallelism was not changed.
>
> Also want to mention that I didn't manually specify OperatorID in my job. The
> document does mentioned manually OperatorID assignment is suggested, just
> curious is that mandatory in my case to fix the problem I'm seeing, given
> that my program doesn't change at all so the autogenerated operatorID
> should be unchanged after parallelism increase?
>
> thanks,
> Lei
>
>
>

Reply via email to