Hi Chris,

Thanks for letting us know! I created a PR to fix this in the Flink Runner doc: 
https://github.com/apache/beam-site/pull/262/files 
<https://github.com/apache/beam-site/pull/262/files>. Do you think this is 
enough or should we put a stronger warning?

Best,
Aljoscha

> On 20. Jun 2017, at 16:23, Chris Hebert 
> <[email protected]> wrote:
> 
> The solution (in this case) was to swap the Flink binary for the compatible 
> Scala version.
> 
> Peering deeper into the Flink JobManager logs I found:
> 
> 2017-06-19 14:07:53,459 ERROR Remoting - scala.Option; local class 
> incompatible: stream classdesc serialVersionUID = -2062608324514658839, local 
> class serialVersionUID = -114498752079829388
> java.io.InvalidClassException: scala.Option; local class incompatible: stream 
> classdesc serialVersionUID = -2062608324514658839, local class 
> serialVersionUID = -114498752079829388
>   at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
>   ...
> 
> This led me to try the binary for Flink 1.2.1 + Hadoop 2.6 + Scala 2.10, 
> rather than the binary for Flink 1.2.1 + Hadoop 2.6 + Scala 2.11 I used 
> originally. (https://flink.apache.org/downloads.html 
> <https://flink.apache.org/downloads.html>)
> 
> This switch solved the problem (or, at least, I didn't have the problem after 
> the switch).
> 
> This actually should have been evident to me through the mvn dependency:tree 
> step on the FlinkRunner guide. 
> (https://beam.apache.org/documentation/runners/flink/ 
> <https://beam.apache.org/documentation/runners/flink/>)
> 
> The relevant line from that step says:
> [INFO] |  +- org.apache.flink:flink-streaming-java_2.10:jar:1.1.2:runtime
> 
> And the "2.10" is what tips us off to the need for Flink version with Scala 
> 2.10 instead of Scala 2.11 (I think).
> 
> If so, that should be mentioned more explicitly on the FlinkRunner 
> documentation page.
> 
> Cheers!
> 
> On Fri, Jun 16, 2017 at 4:44 AM, Aljoscha Krettek <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi Chris,
> 
> I just followed your process myself (getting Flink 1.2.1, starting in local 
> cluster mode, running Beam word-count Quickstart on cluster) and everything 
> worked for me. Could you double check whether the JobManager is reachable 
> under the expected address?
> 
> On another note, you can also run Beam jobs on Flink with the usual bin/flink 
> tool, i.e. to submit as a one-job YARN session or to submit to a running YARN 
> Flink cluster:
> bin/flink run -c main-class path/to/jar.jar <program arguments>
> 
> Where <program arguments> would be exactly the same arguments that you used 
> before.
> 
> Best,
> Aljoscha
> 
> > On 15. Jun 2017, at 17:44, Chris Hebert 
> > <[email protected] 
> > <mailto:[email protected]>> wrote:
> >
> > Hi,
> >
> > The error is pasted below my procedure.
> >
> >
> > ### My Procedure for Beam on "a long-running Local Flink Cluster":
> >
> > Beam WordCount Quickstart: 
> > https://beam.apache.org/get-started/quickstart-java/ 
> > <https://beam.apache.org/get-started/quickstart-java/>
> >
> > Run:
> > > cd /user/me/beam
> > > mvn archetype:generate \
> > >     -DarchetypeGroupId=org.apache.beam \
> > >     -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
> > >     -DarchetypeVersion=2.0.0 \
> > >     -DgroupId=org.example \
> > >     -DartifactId=word-count-beam \
> > >     -Dversion="0.1" \
> > >     -Dpackage=org.apache.beam.examples \
> > >     -DinteractiveMode=false
> >
> > Beam on FlinkRunner Guide: 
> > https://beam.apache.org/documentation/runners/flink/ 
> > <https://beam.apache.org/documentation/runners/flink/>
> >
> > Navigate into word-count-beam and identify the appropriate Flink version to 
> > be 1.2.1:
> > > cd word-count-beam
> > > mvn dependency:tree -Pflink-runner | grep flink
> >
> > Local Flink Cluster Quickstart Guide: 
> > https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/setup_quickstart.html
> >  
> > <https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/setup_quickstart.html>
> >
> > Follow the Local Flink Cluster Quickstart Guide. The "Apache Flink Web 
> > Dashboard" opens in a browser showing jobs successfully running and 
> > completed as I submit them. I keep this running.
> >
> > Back in /user/me/beam/word-count-beam/, run:
> > > mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount 
> > > \
> > >     -Dexec.args=" \
> > >         --runner=FlinkRunner \
> > >         --flinkMaster=localhost:6123 \
> > >         --filesToStage=target/word-count-beam-0.1.jar \
> > >         --inputFile=/user/me/beam/word-count-beam/pom.xml \
> > >         --output=/user/me/beam/word-count-beam/output_01" \
> > >     -Pflink-runner
> >
> > The flinkMaster host:port is identified in the JobManager tab of the Apache 
> > Flink Web Dashboard. Note that the Beam guide says to use 
> > "--filesToStage=target/word-count-beam-bundled-0.1.jar", but Maven actually 
> > only builds "target/word-count-beam-0.1.jar".
> >
> > The above command runs until it reaches the errors pasted below. The job 
> > never makes it onto the Apache Flink Web Dashboard, and no output is 
> > produced.
> >
> > Note that the following command (under the "Flink-local" tab on the Beam 
> > Quickstart Guide) works fine, but it starts it's own instance of a Local 
> > Flink Cluster. The job never makes it onto the Apache Flink Web Dashboard 
> > of my long-standing Local Flink Cluster I set up above. This makes sense, 
> > because it doesn't use "-m" to connect to the long-running JobManager.
> > > mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount 
> > > \
> > >     -Dexec.args=" \
> > >          --runner=FlinkRunner \
> > >          --inputFile=pom.xml \
> > >          --output=counts" \
> > >     -Pflink-runner
> >
> >
> > ### My Procedure for Beam on "a long-running Flink Cluster on YARN":
> >
> > Flink on YARN Setup: 
> > https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/yarn_setup.html
> >  
> > <https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/yarn_setup.html>
> >
> > Same as procedure above, except I am on a YARN cluster with HDFS and 
> > ZooKeeper, etc.
> >
> > As before, I have the right Flink version, the Apache Flink Web Dashboard 
> > works, I run the org.apache.beam.examples.WordCount command as above with 
> > the appropriate flinkMaster host:port as identified from the JobManager tab 
> > of the Apache Flink Web Dashboard.
> >
> > The command runs until it reaches the errors pasted below. The job never 
> > makes it onto the Apache Flink Web Dashboard, and no output is produced.
> >
> >
> > ### Both Procedures:
> >
> > In both procedures, I tested the accompanying DirectRunner and 
> > "Flink-local" commands provided in the Beam Quickstart Guide work fine. It 
> > is only when I attempt to run a job on a long-running Local Flink Cluster 
> > or long-running Flink Cluster on YARN that the below issues occur.
> >
> >
> > ### The Error:
> > ...
> > Jun 15, 2017 9:20:02 AM org.apache.beam.runners.flink.FlinkRunner run
> > INFO: Starting execution of Flink program.
> > ...
> > INFO: Starting remoting
> > Jun 15, 2017 9:20:03 AM 
> > akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 
> > apply$mcV$sp
> > INFO: Remoting started; listening on addresses 
> > :[akka.tcp://[email protected]:54735 <http://[email protected]:54735/>]
> > Jun 15, 2017 9:20:03 AM org.apache.flink.runtime.client.JobClientActor 
> > handleMessage
> > INFO: Received SubmitJobAndWait(JobGraph(jobId: 
> > 18391da12bb10e38134be676e2bc1002)) but there is no connection to a 
> > JobManager yet.
> > Jun 15, 2017 9:20:03 AM 
> > org.apache.flink.runtime.client.JobSubmissionClientActor handleCustomMessage
> > INFO: Received job wordcount-chris0hebert-0615142002-7d71a380 
> > (18391da12bb10e38134be676e2bc1002).
> > Jun 15, 2017 9:20:03 AM org.apache.flink.runtime.client.JobClientActor 
> > disconnectFromJobManager
> > INFO: Disconnect from JobManager null.
> > Jun 15, 2017 9:20:03 AM 
> > akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2 
> > apply$mcV$sp
> >
> > [**************** THIS LOOKS IMPORTANT ******************]
> >
> > WARNING: Association with remote system [akka.tcp://flink@localhost:6123] 
> > has failed, address is now gated for [5000] ms. Reason: [Disassociated]
> > Jun 15, 2017 9:21:03 AM org.apache.flink.runtime.client.JobClientActor 
> > terminate
> > ...
> > Jun 15, 2017 9:21:03 AM org.apache.beam.runners.flink.FlinkRunner run
> > SEVERE: Pipeline execution failed
> > org.apache.flink.client.program.ProgramInvocationException: The program 
> > execution failed: Couldn't retrieve the JobExecutionResult from the 
> > JobManager.
> > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> > ...
> > Caused by: 
> > org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: 
> > Lost connection to the JobManager.
> > at 
> > org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:207)
> > ...
> > [WARNING]
> > java.lang.reflect.InvocationTargetException
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > ...
> > Caused by: org.apache.flink.client.program.ProgramInvocationException: The 
> > program execution failed: Couldn't retrieve the JobExecutionResult from the 
> > JobManager.
> > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> > ...
> > Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't 
> > retrieve the JobExecutionResult from the JobManager.
> > at 
> > org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:294)
> > ...
> > Caused by: 
> > org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: 
> > Lost connection to the JobManager.
> > at 
> > org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:207)
> > ...
> > [INFO] 
> > ------------------------------------------------------------------------
> > [INFO] BUILD FAILURE
> > [INFO] 
> > ------------------------------------------------------------------------
> > ...
> > [ERROR] Failed to execute goal 
> > org.codehaus.mojo:exec-maven-plugin:1.4.0:java (default-cli) on project 
> > word-count-beam: An exception occured while executing the Java class. null: 
> > InvocationTargetException: Pipeline execution failed: The program execution 
> > failed: Couldn't retrieve the JobExecutionResult from the JobManager. Lost 
> > connection to the JobManager. -> [Help 1]
> > ...
> >
> > The same error occurs when I run the the Beam WordCount on the Flink 
> > YARN-Cluster, except obviously my JobManager's address and port is 
> > different when mentioned in the "WARNING".
> >
> >
> > ### The Ask:
> >
> > What am I missing?
> 
> 

Reply via email to