And now the job is stuck in a suspended state and I seem to have no way to get it out of that state again!
On Fri, 7 Feb 2020 at 11:50, Stephen Connolly < [email protected]> wrote: > The plot thickens... I was able to rescale down... just not back up > again!!! > > root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink list -m > localhost:8081 > Waiting for response... > ------------------ Running/Restarting Jobs ------------------- > 07.02.2020 11:26:33 : ebc20a700c334f61ea03ecdf3d8939ca : Test topology > (RUNNING) > -------------------------------------------------------------- > No scheduled jobs. > root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink modify -m > localhost:8081 ebc20a700c334f61ea03ecdf3d8939ca -p 1 > Modify job ebc20a700c334f61ea03ecdf3d8939ca. > Rescaled job ebc20a700c334f61ea03ecdf3d8939ca. Its new parallelism is 1. > root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink modify -m > localhost:8081 ebc20a700c334f61ea03ecdf3d8939ca -p 2 > Modify job ebc20a700c334f61ea03ecdf3d8939ca. > > ------------------------------------------------------------ > The program finished with the following exception: > > org.apache.flink.util.FlinkException: Could not rescale job > ebc20a700c334f61ea03ecdf3d8939ca. > at > org.apache.flink.client.cli.CliFrontend.lambda$modify$10(CliFrontend.java:799) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985) > at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:790) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1068) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: java.util.concurrent.CompletionException: > java.lang.IllegalStateException: Suspend needs to happen atomically > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor.aroundReceive(Actor.scala:502) > at akka.actor.Actor.aroundReceive$(Actor.scala:500) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) > Caused by: java.lang.IllegalStateException: Suspend needs to happen > atomically > at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172) > at > org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) > ... 20 more > > On Fri, 7 Feb 2020 at 11:40, Stephen Connolly < > [email protected]> wrote: > >> So I am looking at the Flink Management REST API... and, as I see it, >> there are two paths to rescale a running topology: >> >> 1. Stop the topology with a savepoint and then start it up with the new >> savepoint; or >> 2. Use the /jobs/:jobid/rescaling >> <https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html#jobs-jobid-rescaling> >> endpoint >> >> The first one seems to work just fine. >> >> The second one seems to just blow up every time I try to use it... I'll >> get things like: >> >> >> https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-log-txt >> >> The above was for the topology >> https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-streamingjob-java >> running with options: >> >> --source parallel >> >> Things are even worse with --source iterator as that has no checkpoint >> state to recover from >> >> Right now I am trying to discover what preconditions are required to be >> met in order to be able to safely call the Rescaling endpoint and actually >> have it work... I should note that I currently have not managed to get it >> to work at all!!! >> >> One of the things we are trying to do is add some automation to enable >> scale-up / down as we see surges in processing load. We want to have an >> automated system that can respond to those situations automatically for low >> deltas and trigger an on-call engineer for persistent excess load. In that >> regard I'd like to know what the automation should check to know whether it >> can do rescaling via the dedicated end-point or if it should use the >> reliable (but presumably slower) path of stop with savepoint & start from >> savepoint. >> >> The >> https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-streamingjob-java >> job I have been using is just a quick job to let me test the automation on >> a local cluster. It is designed to output a strictly increasing sequence of >> numbers without missing any... optionally double them and then print them >> out. The different sources are me experimenting with different types of >> operator to see what kinds of topology can work with the rescaling end-point >> >> Thanks in advance >> >
