Hello,
I have a batch pipeline with Beam 2.22.0 reading about 200 GiB from
BigQuery, mapping the data and writing it out via CassandraIO.
When I run the pipeline via the Classic Java Flink runner on a 1.10.1 Flink
cluster I face the following issue:
When launching the pipeline via
bin/flink run -d -c test.beam.BigQueryToCassandra -j
/mnt/pipelines/beam_pipelines.jar --runner=FlinkRunner --appName=backload
--numberOfExecutionRetries=20 --executionRetryDelay=10000 --project=XXX
--parallelism=1
The shell then returns after some minutes and throws:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Pipeline execution failed
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: java.lang.RuntimeException: Pipeline execution failed
at
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:90)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
at
ch.ricardo.di.beam.BigQueryToCassandra.main(BigQueryToCassandra.java:69)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 8 more
Caused by: java.lang.RuntimeException:
java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
JobGraph.
at
org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:290)
at
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:952)
at
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:84)
at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:53)
at
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:150)
at
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:87)
... 16 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
JobGraph.
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:947)
... 20 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
to submit JobGraph.
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException:
[Internal server error., <Exception on server side:
org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
already been submitted.
at
org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:274)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
End of exception on server side>]
at
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
at
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
... 4 more
command terminated with exit code 1
However, when checking the jobmanager logs, I can see that the pipeline has
been launched and is running successfully. I have the feeling, since the
BigQuery extract job is being triggered from the jobmanager, that there is
some timeout I might need to increase to make it work without throwing this
- to allow the BigQuery extract job to finish before that timeout is hit
maybe. I also tried using BATCH_FORCED, but that didn't do the trick.
My second question is around parallelism in batch mode: It seems like even
though I run my pipeline with parallelism=2 and Flink shows that each task
has a parallelism of 2, only one taskmanager is really doing work and the
other one has nothing to do according to its logs. As if the bundles were
not evenly distributed after the export from BigQuery is done. Is there
anything I can do about it?
Cheers,
Tobi