Hi Kyle,

thank you for creating the JIRA ticket, I think my best option right now is
to wait for a Beam version that is running on Flink 1.10 then - unless
there is a new Beam release around the corner :)

Best,
Tobi

On Thu, Feb 20, 2020 at 11:52 PM Kyle Weaver <[email protected]> wrote:

> Hi Tobi,
>
> This seems like a bug with Beam 2.19. I filed
> https://issues.apache.org/jira/browse/BEAM-9345 to track the issue.
>
> > What puzzles me is that the session cluster should be allowed to have
> multiple environments in detached mode - or am I wrong?
>
> It looks like that check is removed in Flink 1.10:
> https://issues.apache.org/jira/browse/FLINK-15201
>
> Thanks for reporting.
> Kyle
>
> On Thu, Feb 20, 2020 at 4:10 AM Kaymak, Tobias <[email protected]>
> wrote:
>
>> Hello,
>>
>> I am trying to upgrade from a Flink session cluster 1.8 to 1.9 and from
>> Beam 2.16.0 to 2.19.0.
>> Everything went quite smoothly, the local runner and the local Flink
>> runner work flawlessly.
>>
>> However when I:
>>   1. Generate a Beam jar for the FlinkRunner via maven (mvn package
>> -PFlinkRunner)
>>   2. Glue that into a Flink 1.9 docker image
>>   3. Start the image as a Standalone Session Cluster
>>
>> When I try to launch the first pipeline I get the following exception
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Failed to construct instance from factory method
>> FlinkRunner#fromOptions(interface
>> org.apache.beam.sdk.options.PipelineOptions)
>>         at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
>>         at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>>         at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>>         at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>>         at
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>>         at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>>         at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>>         at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>>         at
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>         at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
>> Caused by: java.lang.RuntimeException: Failed to construct instance from
>> factory method FlinkRunner#fromOptions(interface
>> org.apache.beam.sdk.options.PipelineOptions)
>>         at
>> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
>>         at
>> org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:155)
>>         at
>> org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:55)
>>         at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
>>         at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
>>         at
>> ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:180)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>         at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>         ... 9 more
>> Caused by: java.lang.reflect.InvocationTargetException
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>         at
>> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:214)
>>         ... 19 more
>> Caused by: org.apache.flink.api.common.InvalidProgramException: Multiple
>> environments cannot be created in detached mode
>>         at
>> org.apache.flink.client.program.ContextEnvironmentFactory.createExecutionEnvironment(ContextEnvironmentFactory.java:67)
>>         at java.util.Optional.map(Optional.java:215)
>>         at
>> org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment(ExecutionEnvironment.java:1068)
>>         at
>> org.apache.beam.runners.flink.translation.utils.Workarounds.restoreOriginalStdOutAndStdErrIfApplicable(Workarounds.java:43)
>>         at
>> org.apache.beam.runners.flink.FlinkRunner.<init>(FlinkRunner.java:96)
>>         at
>> org.apache.beam.runners.flink.FlinkRunner.fromOptions(FlinkRunner.java:90)
>>         ... 24 more
>>
>> I've checked the release notes and the issues and couldn't find anything
>> that relates to this. What puzzles me is that the session cluster should be
>> allowed to have multiple environments in detached mode - or am I wrong?
>>
>> Best,
>> Tobi
>>
>

-- 

Tobias Kaymak
Data Engineer
Data Intelligence

[email protected]
www.ricardo.ch
Theilerstrasse 1a, 6300 Zug

Reply via email to