Hi Kenneth, Thanks for looking into this... > Do you mean that you cloned https://github.com/apache/beam-starter-java and tried to run on the FlinkRunner with version 1.16? We have been running the beam jobs in production on Google cloud Dataflow since 2 years. We want to now move to Flink runners for all of our jobs... However, to test out the compatibility, we tried out the word-count example code <https://beam.apache.org/get-started/wordcount-example/> on Flink 1.16. We noticed this issue when trying to run this sample code with the beam version we were using for production pipelines...
Is there any recommendations from the community on how to get this setup running ? We need to have the latest beam SDK as we have a few critical pipelines relying on newer Beam IOs... - Thanks and Regards Rajath On Thu, Oct 24, 2024 at 8:41 PM Kenneth Knowles <k...@apache.org> wrote: > Hi Rajath, > > Thanks for raising this. > > Do you mean that you cloned https://github.com/apache/beam-starter-java > and tried to run on the FlinkRunner with version 1.16? > > it looks like the operation that is failing has been in Beam since 2.50.0 ( > https://github.com/apache/beam/pull/26193) so I would expect that there > is a change in Flink at 1.16 that causes getNetRuntime() to be unsupported. > > Looking at the Flink code, the error thrown has been there since 1.0.0 > (introduced > in 1.0.0 > <https://github.com/apache/flink/commit/00e44eda17052c4b3de4f9690c250a45df6f407f> > then moved in 1.10.0 > <https://github.com/apache/flink/commit/c36b35e6876ecdc717dade653e8554f9d8b543c9>). > So the issue is caused (at least on the surface) by somehow ending up in > this detached codepath even for attached jobs. > > On the one hand, it would be great to understand how to configure things > so it could work for your use and versions. But if a code fix is required, > it is too much to go that far back in time. So then we'd want to look at > how things run with recent versions of Beam and Flink. > > Kenn > > On Thu, Oct 24, 2024 at 7:50 AM Rajath BK <rajath.u...@gmail.com> wrote: > >> Hi Beam Community, >> We have been running our Beam jobs on Google Cloud Dataflow for a while >> now. We are now evaluating migrating to running them on Flink. >> *All of our jobs are built using beam sdk version 2.56.0*. >> >> During this exercise, we have experienced an issue where our jobs are not >> coming up on FlinkRunner with Flink(v1.16.x) when we use the same >> pipelines built using 2.56.0 but when we downgrade the beam SDK version >> to 2.49 the jobs start running. But, when we downgrade we lose out on >> some of the features offered in the newer Beam Sdks ( RequestResponseIO >> among others, which is a critical component in one of our jobs). >> >> We tried to run the `WordCount` example pipeline with the beam version >> starting from 2.49 till 2.56.0 with the respective runner versions from >> 2.49 to 2.56.0 on Flink 1.16.x but to no avail. >> >> *These are the failure **LOGS -* >> Caused by: org.apache.flink.client.program.ProgramInvocationException: >> The main method caused an error: Pipeline execution failed at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] at >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] at >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more Caused by: >> java.lang.RuntimeException: Pipeline execution failed at >> org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:107) ~[?:?] >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?] at >> org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?] at >> com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?] at >> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_372] >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> ~[?:1.8.0_372] at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> ~[?:1.8.0_372] at java.lang.reflect.Method.invoke(Method.java:498) >> ~[?:1.8.0_372] at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] at >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] at >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more Caused by: >> org.apache.flink.api.common.InvalidProgramException: Job was submitted in >> detached mode. Results of job execution, such as accumulators, runtime, >> etc. are not available. at >> org.apache.flink.core.execution.DetachedJobExecutionResult.getNetRuntime(DetachedJobExecutionResult.java:51) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] at >> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.createAttachedPipelineResult(FlinkPipelineExecutionEnvironment.java:171) >> ~[?:?] at >> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:154) >> ~[?:?] at >> org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:104) ~[?:?] >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?] at >> org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?] at >> com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?] at >> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_372] >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> ~[?:1.8.0_372] at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> ~[?:1.8.0_372] at java.lang.reflect.Method.invoke(Method.java:498) >> ~[?:1.8.0_372] at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] at >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] at >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more 2024-07-25 11:45:12,956 >> ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal >> error occurred in the cluster entrypoint. >> java.util.concurrent.CompletionException: >> org.apache.flink.client.deployment.application.ApplicationExecutionException: >> Could not execute application. at >> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) >> ~[?:1.8.0_372] at >> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) >> ~[?:1.8.0_372] at >> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) >> ~[?:1.8.0_372] at >> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) >> ~[?:1.8.0_372] at >> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) >> ~[?:1.8.0_372] at >> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) >> ~[?:1.8.0_372] at >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:287) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] at >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:224) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> ~[?:1.8.0_372] at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> ~[?:1.8.0_372] at >> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) >> ~[flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3] at >> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) >> ~[flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3] at >> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) >> ~[flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3] at >> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) >> [flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3] at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) >> [flink-rpc-akka_3172f478-923e-4f49-b4aa-a63947c06051.jar:1.14.3] at >> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) >> [?:1.8.0_372] at >> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) >> [?:1.8.0_372] at >> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) >> [?:1.8.0_372] at >> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) >> [?:1.8.0_372] Caused by: >> org.apache.flink.client.deployment.application.ApplicationExecutionException: >> Could not execute application. ... 13 more Caused by: >> org.apache.flink.client.program.ProgramInvocationException: The main method >> caused an error: Pipeline execution failed at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] at >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] at >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more Caused by: >> java.lang.RuntimeException: Pipeline execution failed at >> org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:107) ~[?:?] >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?] at >> org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?] at >> com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?] at >> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_372] >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> ~[?:1.8.0_372] at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> ~[?:1.8.0_372] at java.lang.reflect.Method.invoke(Method.java:498) >> ~[?:1.8.0_372] at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] at >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] at >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more Caused by: >> org.apache.flink.api.common.InvalidProgramException: Job was submitted in >> detached mode. Results of job execution, such as accumulators, runtime, >> etc. are not available. at >> org.apache.flink.core.execution.DetachedJobExecutionResult.getNetRuntime(DetachedJobExecutionResult.java:51) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] at >> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.createAttachedPipelineResult(FlinkPipelineExecutionEnvironment.java:171) >> ~[?:?] at >> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:154) >> ~[?:?] at >> org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:104) ~[?:?] >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) ~[?:?] at >> org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) ~[?:?] at >> com.gojek.de.jobs.JobManager.main(JobManager.java:83) ~[?:?] at >> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_372] >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> ~[?:1.8.0_372] at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> ~[?:1.8.0_372] at java.lang.reflect.Method.invoke(Method.java:498) >> ~[?:1.8.0_372] at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] at >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] at >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) >> ~[flink-dist_2.12-1.14.3.jar:1.14.3] ... 12 more >> I have gone through the beam community issues (ex - Issue/29660 >> <https://github.com/apache/beam/issues/29660>) and also the release >> notes. Beyond the compatibility mismatch (which in our case is already >> addressed), I didn't find anything else that could directly be the cause >> for the mentioned issue... >> >> QUESTION IS - Has anyone in the community experienced such issues and >> have found a workaround to run Beam pipelines built using Beam SDKs > >> 2.49.0 on Flink ? Particularly if anybody is successfully running Beam >> pipelines built using SDK version 2.56 or newer... >> >> Any help is appreciated ! >> >> Thanks in advance >> >> Regards >> Rajath >> >>