Hi Chris, Thanks for letting us know! I created a PR to fix this in the Flink Runner doc: https://github.com/apache/beam-site/pull/262/files <https://github.com/apache/beam-site/pull/262/files>. Do you think this is enough or should we put a stronger warning?
Best, Aljoscha > On 20. Jun 2017, at 16:23, Chris Hebert > <[email protected]> wrote: > > The solution (in this case) was to swap the Flink binary for the compatible > Scala version. > > Peering deeper into the Flink JobManager logs I found: > > 2017-06-19 14:07:53,459 ERROR Remoting - scala.Option; local class > incompatible: stream classdesc serialVersionUID = -2062608324514658839, local > class serialVersionUID = -114498752079829388 > java.io.InvalidClassException: scala.Option; local class incompatible: stream > classdesc serialVersionUID = -2062608324514658839, local class > serialVersionUID = -114498752079829388 > at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) > ... > > This led me to try the binary for Flink 1.2.1 + Hadoop 2.6 + Scala 2.10, > rather than the binary for Flink 1.2.1 + Hadoop 2.6 + Scala 2.11 I used > originally. (https://flink.apache.org/downloads.html > <https://flink.apache.org/downloads.html>) > > This switch solved the problem (or, at least, I didn't have the problem after > the switch). > > This actually should have been evident to me through the mvn dependency:tree > step on the FlinkRunner guide. > (https://beam.apache.org/documentation/runners/flink/ > <https://beam.apache.org/documentation/runners/flink/>) > > The relevant line from that step says: > [INFO] | +- org.apache.flink:flink-streaming-java_2.10:jar:1.1.2:runtime > > And the "2.10" is what tips us off to the need for Flink version with Scala > 2.10 instead of Scala 2.11 (I think). > > If so, that should be mentioned more explicitly on the FlinkRunner > documentation page. > > Cheers! > > On Fri, Jun 16, 2017 at 4:44 AM, Aljoscha Krettek <[email protected] > <mailto:[email protected]>> wrote: > Hi Chris, > > I just followed your process myself (getting Flink 1.2.1, starting in local > cluster mode, running Beam word-count Quickstart on cluster) and everything > worked for me. Could you double check whether the JobManager is reachable > under the expected address? > > On another note, you can also run Beam jobs on Flink with the usual bin/flink > tool, i.e. to submit as a one-job YARN session or to submit to a running YARN > Flink cluster: > bin/flink run -c main-class path/to/jar.jar <program arguments> > > Where <program arguments> would be exactly the same arguments that you used > before. > > Best, > Aljoscha > > > On 15. Jun 2017, at 17:44, Chris Hebert > > <[email protected] > > <mailto:[email protected]>> wrote: > > > > Hi, > > > > The error is pasted below my procedure. > > > > > > ### My Procedure for Beam on "a long-running Local Flink Cluster": > > > > Beam WordCount Quickstart: > > https://beam.apache.org/get-started/quickstart-java/ > > <https://beam.apache.org/get-started/quickstart-java/> > > > > Run: > > > cd /user/me/beam > > > mvn archetype:generate \ > > > -DarchetypeGroupId=org.apache.beam \ > > > -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ > > > -DarchetypeVersion=2.0.0 \ > > > -DgroupId=org.example \ > > > -DartifactId=word-count-beam \ > > > -Dversion="0.1" \ > > > -Dpackage=org.apache.beam.examples \ > > > -DinteractiveMode=false > > > > Beam on FlinkRunner Guide: > > https://beam.apache.org/documentation/runners/flink/ > > <https://beam.apache.org/documentation/runners/flink/> > > > > Navigate into word-count-beam and identify the appropriate Flink version to > > be 1.2.1: > > > cd word-count-beam > > > mvn dependency:tree -Pflink-runner | grep flink > > > > Local Flink Cluster Quickstart Guide: > > https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/setup_quickstart.html > > > > <https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/setup_quickstart.html> > > > > Follow the Local Flink Cluster Quickstart Guide. The "Apache Flink Web > > Dashboard" opens in a browser showing jobs successfully running and > > completed as I submit them. I keep this running. > > > > Back in /user/me/beam/word-count-beam/, run: > > > mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount > > > \ > > > -Dexec.args=" \ > > > --runner=FlinkRunner \ > > > --flinkMaster=localhost:6123 \ > > > --filesToStage=target/word-count-beam-0.1.jar \ > > > --inputFile=/user/me/beam/word-count-beam/pom.xml \ > > > --output=/user/me/beam/word-count-beam/output_01" \ > > > -Pflink-runner > > > > The flinkMaster host:port is identified in the JobManager tab of the Apache > > Flink Web Dashboard. Note that the Beam guide says to use > > "--filesToStage=target/word-count-beam-bundled-0.1.jar", but Maven actually > > only builds "target/word-count-beam-0.1.jar". > > > > The above command runs until it reaches the errors pasted below. The job > > never makes it onto the Apache Flink Web Dashboard, and no output is > > produced. > > > > Note that the following command (under the "Flink-local" tab on the Beam > > Quickstart Guide) works fine, but it starts it's own instance of a Local > > Flink Cluster. The job never makes it onto the Apache Flink Web Dashboard > > of my long-standing Local Flink Cluster I set up above. This makes sense, > > because it doesn't use "-m" to connect to the long-running JobManager. > > > mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount > > > \ > > > -Dexec.args=" \ > > > --runner=FlinkRunner \ > > > --inputFile=pom.xml \ > > > --output=counts" \ > > > -Pflink-runner > > > > > > ### My Procedure for Beam on "a long-running Flink Cluster on YARN": > > > > Flink on YARN Setup: > > https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/yarn_setup.html > > > > <https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/yarn_setup.html> > > > > Same as procedure above, except I am on a YARN cluster with HDFS and > > ZooKeeper, etc. > > > > As before, I have the right Flink version, the Apache Flink Web Dashboard > > works, I run the org.apache.beam.examples.WordCount command as above with > > the appropriate flinkMaster host:port as identified from the JobManager tab > > of the Apache Flink Web Dashboard. > > > > The command runs until it reaches the errors pasted below. The job never > > makes it onto the Apache Flink Web Dashboard, and no output is produced. > > > > > > ### Both Procedures: > > > > In both procedures, I tested the accompanying DirectRunner and > > "Flink-local" commands provided in the Beam Quickstart Guide work fine. It > > is only when I attempt to run a job on a long-running Local Flink Cluster > > or long-running Flink Cluster on YARN that the below issues occur. > > > > > > ### The Error: > > ... > > Jun 15, 2017 9:20:02 AM org.apache.beam.runners.flink.FlinkRunner run > > INFO: Starting execution of Flink program. > > ... > > INFO: Starting remoting > > Jun 15, 2017 9:20:03 AM > > akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 > > apply$mcV$sp > > INFO: Remoting started; listening on addresses > > :[akka.tcp://[email protected]:54735 <http://[email protected]:54735/>] > > Jun 15, 2017 9:20:03 AM org.apache.flink.runtime.client.JobClientActor > > handleMessage > > INFO: Received SubmitJobAndWait(JobGraph(jobId: > > 18391da12bb10e38134be676e2bc1002)) but there is no connection to a > > JobManager yet. > > Jun 15, 2017 9:20:03 AM > > org.apache.flink.runtime.client.JobSubmissionClientActor handleCustomMessage > > INFO: Received job wordcount-chris0hebert-0615142002-7d71a380 > > (18391da12bb10e38134be676e2bc1002). > > Jun 15, 2017 9:20:03 AM org.apache.flink.runtime.client.JobClientActor > > disconnectFromJobManager > > INFO: Disconnect from JobManager null. > > Jun 15, 2017 9:20:03 AM > > akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2 > > apply$mcV$sp > > > > [**************** THIS LOOKS IMPORTANT ******************] > > > > WARNING: Association with remote system [akka.tcp://flink@localhost:6123] > > has failed, address is now gated for [5000] ms. Reason: [Disassociated] > > Jun 15, 2017 9:21:03 AM org.apache.flink.runtime.client.JobClientActor > > terminate > > ... > > Jun 15, 2017 9:21:03 AM org.apache.beam.runners.flink.FlinkRunner run > > SEVERE: Pipeline execution failed > > org.apache.flink.client.program.ProgramInvocationException: The program > > execution failed: Couldn't retrieve the JobExecutionResult from the > > JobManager. > > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > > ... > > Caused by: > > org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: > > Lost connection to the JobManager. > > at > > org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:207) > > ... > > [WARNING] > > java.lang.reflect.InvocationTargetException > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > ... > > Caused by: org.apache.flink.client.program.ProgramInvocationException: The > > program execution failed: Couldn't retrieve the JobExecutionResult from the > > JobManager. > > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > > ... > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't > > retrieve the JobExecutionResult from the JobManager. > > at > > org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:294) > > ... > > Caused by: > > org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: > > Lost connection to the JobManager. > > at > > org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:207) > > ... > > [INFO] > > ------------------------------------------------------------------------ > > [INFO] BUILD FAILURE > > [INFO] > > ------------------------------------------------------------------------ > > ... > > [ERROR] Failed to execute goal > > org.codehaus.mojo:exec-maven-plugin:1.4.0:java (default-cli) on project > > word-count-beam: An exception occured while executing the Java class. null: > > InvocationTargetException: Pipeline execution failed: The program execution > > failed: Couldn't retrieve the JobExecutionResult from the JobManager. Lost > > connection to the JobManager. -> [Help 1] > > ... > > > > The same error occurs when I run the the Beam WordCount on the Flink > > YARN-Cluster, except obviously my JobManager's address and port is > > different when mentioned in the "WARNING". > > > > > > ### The Ask: > > > > What am I missing? > >
