Hi ChangZhuo, this seems to be a current limitation of the unaligned checkpoints [1], are you using any broadcasted streams in your application?
[1] https://issues.apache.org/jira/browse/FLINK-22815 Best, D. On Tue, Oct 19, 2021 at 3:58 AM ChangZhuo Chen (陳昌倬) <[email protected]> wrote: > Hi, > > We found that Flink 1.14.0 cannot rescale when using the following > configuration: > > * Kubernetes per-job session mode > * Reactive mode > * Unaligned checkpoint > * Latest checkpoint type is checkpoint, not savepoint > > It is, however, can rescale from savepoint. > > > The following is redacted log when error happens: > > 2021-10-18 09:31:14,093 INFO > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - > Disconnect job manager > [email protected]://flink@<censored>-jobmanager:6123/user/rpc/jobmanager_2 > for job 00000000000000000000000000000000 from the resource manager. > 2021-10-18 09:31:14,096 INFO > org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - > Stopping DefaultLeaderElectionService. > 2021-10-18 09:31:14,096 INFO > org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionDriver > [] - Closing > KubernetesLeaderElectionDriver{configMapName='<censored>-00000000000000000000000000000000-jobmanager-leader'}. > 2021-10-18 09:31:14,096 INFO > org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer > [] - Stopped to watch for > rt-flink/<censored>-00000000000000000000000000000000-jobmanager-leader, > watching id:6716d415-d6b8-4155-80dc-eddf39a795fb > 2021-10-18 09:31:14,106 INFO > org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - > Clean up the high availability data for job > 00000000000000000000000000000000. > 2021-10-18 09:31:14,119 INFO > org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - > Finished cleaning up the high availability data for job > 00000000000000000000000000000000. > 2021-10-18 09:31:14,421 INFO > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap > [] - Application FAILED: > java.util.concurrent.CompletionException: > org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: > Application Status: FAILED > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$unwrapJobResultException$5(ApplicationDispatcherBootstrap.java:345) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) > ~[?:?] > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > ~[?:?] > at > org.apache.flink.client.deployment.application.JobStatusPollingUtils.lambda$null$2(JobStatusPollingUtils.java:101) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > ~[?:?] > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > ~[?:?] > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > ~[?:?] > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:250) > ~[?:?] > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > ~[?:?] > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > ~[?:?] > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > ~[?:?] > at > org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > ~[?:?] > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > ~[?:?] > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > ~[?:?] > at > org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at akka.dispatch.OnComplete.internal(Future.scala:300) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at akka.dispatch.OnComplete.internal(Future.scala:297) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) > [flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81) > [flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100) > [flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) > [flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) > [flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) [?:?] > at > java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) > [?:?] > at > java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) [?:?] > at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) [?:?] > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) > [?:?] > Caused by: > org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: > Application Status: FAILED > at > org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:71) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > ... 52 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:60) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > ... 52 more > Caused by: java.util.concurrent.CompletionException: > java.util.concurrent.CompletionException: > java.lang.UnsupportedOperationException: Cannot rescale the given pointwise > partitioner. > Did you change the partitioner to forward or rescale? > It may also help to add an explicit shuffle(). > at > org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$25(FutureUtils.java:1407) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) > ~[?:?] > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) > ~[?:?] > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) > ~[?:?] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455) > ~[?:?] > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455) > ~[?:?] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) > ~[?:?] > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) > ~[?:?] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) > ~[?:?] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) > ~[?:?] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) > ~[?:?] > at > scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) > ~[?:?] > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[?:?] > at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[?:?] > at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) ~[?:?] > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) > ~[?:?] > at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[?:?] > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) > ~[?:?] > at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[?:?] > at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[?:?] > ... 5 more > Caused by: java.util.concurrent.CompletionException: > java.lang.UnsupportedOperationException: Cannot rescale the given pointwise > partitioner. > Did you change the partitioner to forward or rescale? > It may also help to add an explicit shuffle(). > at > org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:59) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) > ~[?:?] > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) > ~[?:?] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > ~[?:?] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ~[?:?] > at java.lang.Thread.run(Thread.java:834) ~[?:?] > Caused by: java.lang.UnsupportedOperationException: Cannot rescale the > given pointwise partitioner. > Did you change the partitioner to forward or rescale? > It may also help to add an explicit shuffle(). > at > org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper$6.getOldSubtasks(SubtaskStateMapper.java:180) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.lambda$getNewToOldSubtasksMapping$0(SubtaskStateMapper.java:202) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > java.util.stream.IntPipeline$1$1.accept(IntPipeline.java:180) ~[?:?] > at > java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104) > ~[?:?] > at > java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:699) ~[?:?] > at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?] > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > ~[?:?] > at > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550) ~[?:?] > at > java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) > ~[?:?] > at > java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517) > ~[?:?] > at > org.apache.flink.runtime.checkpoint.RescaleMappings.of(RescaleMappings.java:138) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.getNewToOldSubtasksMapping(SubtaskStateMapper.java:198) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.checkpoint.TaskStateAssignment.getOutputMapping(TaskStateAssignment.java:369) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeResultSubpartitionStates(StateAssignmentOperation.java:379) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:195) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:140) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1548) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1460) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.createExecutionGraphAndRestoreState(AdaptiveScheduler.java:993) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.lambda$createExecutionGraphAndRestoreStateAsync$25(AdaptiveScheduler.java:983) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:57) > ~[flink-dist_2.12-1.14.0.jar:1.14.0] > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) > ~[?:?] > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) > ~[?:?] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > ~[?:?] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ~[?:?] > at java.lang.Thread.run(Thread.java:834) ~[?:?] > 2021-10-18 09:31:14,426 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting > StandaloneApplicationClusterEntryPoint down with application status FAILED. > Diagnostics null. > 2021-10-18 09:31:14,427 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting > down rest endpoint. > > > -- > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org > http://czchen.info/ > Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B >
