Hi ChangZhuo,

this seems to be a current limitation of the unaligned checkpoints [1], are
you using any broadcasted streams in your application?

[1] https://issues.apache.org/jira/browse/FLINK-22815

Best,
D.

On Tue, Oct 19, 2021 at 3:58 AM ChangZhuo Chen (陳昌倬) <[email protected]>
wrote:

> Hi,
>
> We found that Flink 1.14.0 cannot rescale when using the following
> configuration:
>
> * Kubernetes per-job session mode
> * Reactive mode
> * Unaligned checkpoint
> * Latest checkpoint type is checkpoint, not savepoint
>
> It is, however, can rescale from savepoint.
>
>
> The following is redacted log when error happens:
>
>     2021-10-18 09:31:14,093 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Disconnect job manager 
> [email protected]://flink@<censored>-jobmanager:6123/user/rpc/jobmanager_2
> for job 00000000000000000000000000000000 from the resource manager.
>     2021-10-18 09:31:14,096 INFO
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Stopping DefaultLeaderElectionService.
>     2021-10-18 09:31:14,096 INFO
> org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionDriver
> [] - Closing
> KubernetesLeaderElectionDriver{configMapName='<censored>-00000000000000000000000000000000-jobmanager-leader'}.
>     2021-10-18 09:31:14,096 INFO
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer
> [] - Stopped to watch for
> rt-flink/<censored>-00000000000000000000000000000000-jobmanager-leader,
> watching id:6716d415-d6b8-4155-80dc-eddf39a795fb
>     2021-10-18 09:31:14,106 INFO
> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Clean up the high availability data for job
> 00000000000000000000000000000000.
>     2021-10-18 09:31:14,119 INFO
> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Finished cleaning up the high availability data for job
> 00000000000000000000000000000000.
>     2021-10-18 09:31:14,421 INFO
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap
> [] - Application FAILED:
>     java.util.concurrent.CompletionException:
> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException:
> Application Status: FAILED
>             at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$unwrapJobResultException$5(ApplicationDispatcherBootstrap.java:345)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> ~[?:?]
>             at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> ~[?:?]
>             at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> ~[?:?]
>             at
> org.apache.flink.client.deployment.application.JobStatusPollingUtils.lambda$null$2(JobStatusPollingUtils.java:101)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> ~[?:?]
>             at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> ~[?:?]
>             at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> ~[?:?]
>             at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> ~[?:?]
>             at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:250)
> ~[?:?]
>             at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> ~[?:?]
>             at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> ~[?:?]
>             at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> ~[?:?]
>             at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> ~[?:?]
>             at
> org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> ~[?:?]
>             at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> ~[?:?]
>             at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> ~[?:?]
>             at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> ~[?:?]
>             at
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at akka.dispatch.OnComplete.internal(Future.scala:300)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at akka.dispatch.OnComplete.internal(Future.scala:297)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> [flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
> [flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
> [flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
> [flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
> [flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) [?:?]
>             at
> java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
> [?:?]
>             at
> java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) [?:?]
>             at
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) [?:?]
>             at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> [?:?]
>     Caused by:
> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException:
> Application Status: FAILED
>             at
> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:71)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             ... 52 more
>     Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>             at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at
> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:60)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             ... 52 more
>     Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException:
> java.lang.UnsupportedOperationException: Cannot rescale the given pointwise
> partitioner.
>     Did you change the partitioner to forward or rescale?
>     It may also help to add an explicit shuffle().
>             at
> org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$25(FutureUtils.java:1407)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
> ~[?:?]
>             at
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
> ~[?:?]
>             at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
> ~[?:?]
>             at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
> ~[?:?]
>             at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
>             at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
> ~[?:?]
>             at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
> ~[?:?]
>             at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
> ~[?:?]
>             at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
> ~[?:?]
>             at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> ~[?:?]
>             at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> ~[?:?]
>             at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> ~[?:?]
>             at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[?:?]
>             at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[?:?]
>             at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) ~[?:?]
>             at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
> ~[?:?]
>             at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[?:?]
>             at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
> ~[?:?]
>             at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[?:?]
>             at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[?:?]
>             ... 5 more
>     Caused by: java.util.concurrent.CompletionException:
> java.lang.UnsupportedOperationException: Cannot rescale the given pointwise
> partitioner.
>     Did you change the partitioner to forward or rescale?
>     It may also help to add an explicit shuffle().
>             at
> org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:59)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> ~[?:?]
>             at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
> ~[?:?]
>             at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> ~[?:?]
>             at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ~[?:?]
>             at java.lang.Thread.run(Thread.java:834) ~[?:?]
>     Caused by: java.lang.UnsupportedOperationException: Cannot rescale the
> given pointwise partitioner.
>     Did you change the partitioner to forward or rescale?
>     It may also help to add an explicit shuffle().
>             at 
> org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper$6.getOldSubtasks(SubtaskStateMapper.java:180)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at 
> org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.lambda$getNewToOldSubtasksMapping$0(SubtaskStateMapper.java:202)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at
> java.util.stream.IntPipeline$1$1.accept(IntPipeline.java:180) ~[?:?]
>             at
> java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
> ~[?:?]
>             at
> java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:699) ~[?:?]
>             at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
>             at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
> ~[?:?]
>             at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550) ~[?:?]
>             at
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> ~[?:?]
>             at
> java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517)
> ~[?:?]
>             at
> org.apache.flink.runtime.checkpoint.RescaleMappings.of(RescaleMappings.java:138)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at 
> org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.getNewToOldSubtasksMapping(SubtaskStateMapper.java:198)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at
> org.apache.flink.runtime.checkpoint.TaskStateAssignment.getOutputMapping(TaskStateAssignment.java:369)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeResultSubpartitionStates(StateAssignmentOperation.java:379)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:195)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:140)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1548)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1460)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at
> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.createExecutionGraphAndRestoreState(AdaptiveScheduler.java:993)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.lambda$createExecutionGraphAndRestoreStateAsync$25(AdaptiveScheduler.java:983)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at
> org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:57)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
>             at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> ~[?:?]
>             at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
> ~[?:?]
>             at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> ~[?:?]
>             at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ~[?:?]
>             at java.lang.Thread.run(Thread.java:834) ~[?:?]
>     2021-10-18 09:31:14,426 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting
> StandaloneApplicationClusterEntryPoint down with application status FAILED.
> Diagnostics null.
>     2021-10-18 09:31:14,427 INFO
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting
> down rest endpoint.
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>

Reply via email to