The solution (in this case) was to swap the Flink binary for the compatible
Scala version.

Peering deeper into the Flink JobManager logs I found:

2017-06-19 14:07:53,459 ERROR Remoting - scala.Option; local class
incompatible: stream classdesc serialVersionUID = -2062608324514658839,
local class serialVersionUID = -114498752079829388
java.io.InvalidClassException: scala.Option; local class incompatible:
stream classdesc serialVersionUID = -2062608324514658839, local class
serialVersionUID = -114498752079829388
  at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
  ...

This led me to try the binary for Flink 1.2.1 + Hadoop 2.6 + Scala 2.10,
rather than the binary for Flink 1.2.1 + Hadoop 2.6 + Scala 2.11 I used
originally. (https://flink.apache.org/downloads.html)

This switch solved the problem (or, at least, I didn't have the problem
after the switch).

This actually should have been evident to me through the mvn
dependency:tree step on the FlinkRunner guide. (
https://beam.apache.org/documentation/runners/flink/)

The relevant line from that step says:
[INFO] |  +- org.apache.flink:flink-streaming-java_2.10:jar:1.1.2:runtime

And the "2.10" is what tips us off to the need for Flink version with Scala
2.10 instead of Scala 2.11 (I think).

If so, that should be mentioned more explicitly on the FlinkRunner
documentation page.

Cheers!

On Fri, Jun 16, 2017 at 4:44 AM, Aljoscha Krettek <[email protected]>
wrote:

> Hi Chris,
>
> I just followed your process myself (getting Flink 1.2.1, starting in
> local cluster mode, running Beam word-count Quickstart on cluster) and
> everything worked for me. Could you double check whether the JobManager is
> reachable under the expected address?
>
> On another note, you can also run Beam jobs on Flink with the usual
> bin/flink tool, i.e. to submit as a one-job YARN session or to submit to a
> running YARN Flink cluster:
> bin/flink run -c main-class path/to/jar.jar <program arguments>
>
> Where <program arguments> would be exactly the same arguments that you
> used before.
>
> Best,
> Aljoscha
>
> > On 15. Jun 2017, at 17:44, Chris Hebert <chris.hebert-int@
> digitalreasoning.com> wrote:
> >
> > Hi,
> >
> > The error is pasted below my procedure.
> >
> >
> > ### My Procedure for Beam on "a long-running Local Flink Cluster":
> >
> > Beam WordCount Quickstart: https://beam.apache.org/get-
> started/quickstart-java/
> >
> > Run:
> > > cd /user/me/beam
> > > mvn archetype:generate \
> > >     -DarchetypeGroupId=org.apache.beam \
> > >     -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
> > >     -DarchetypeVersion=2.0.0 \
> > >     -DgroupId=org.example \
> > >     -DartifactId=word-count-beam \
> > >     -Dversion="0.1" \
> > >     -Dpackage=org.apache.beam.examples \
> > >     -DinteractiveMode=false
> >
> > Beam on FlinkRunner Guide: https://beam.apache.org/
> documentation/runners/flink/
> >
> > Navigate into word-count-beam and identify the appropriate Flink version
> to be 1.2.1:
> > > cd word-count-beam
> > > mvn dependency:tree -Pflink-runner | grep flink
> >
> > Local Flink Cluster Quickstart Guide: https://ci.apache.org/
> projects/flink/flink-docs-release-1.2/quickstart/setup_quickstart.html
> >
> > Follow the Local Flink Cluster Quickstart Guide. The "Apache Flink Web
> Dashboard" opens in a browser showing jobs successfully running and
> completed as I submit them. I keep this running.
> >
> > Back in /user/me/beam/word-count-beam/, run:
> > > mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount
> \
> > >     -Dexec.args=" \
> > >         --runner=FlinkRunner \
> > >         --flinkMaster=localhost:6123 \
> > >         --filesToStage=target/word-count-beam-0.1.jar \
> > >         --inputFile=/user/me/beam/word-count-beam/pom.xml \
> > >         --output=/user/me/beam/word-count-beam/output_01" \
> > >     -Pflink-runner
> >
> > The flinkMaster host:port is identified in the JobManager tab of the
> Apache Flink Web Dashboard. Note that the Beam guide says to use
> "--filesToStage=target/word-count-beam-bundled-0.1.jar", but Maven
> actually only builds "target/word-count-beam-0.1.jar".
> >
> > The above command runs until it reaches the errors pasted below. The job
> never makes it onto the Apache Flink Web Dashboard, and no output is
> produced.
> >
> > Note that the following command (under the "Flink-local" tab on the Beam
> Quickstart Guide) works fine, but it starts it's own instance of a Local
> Flink Cluster. The job never makes it onto the Apache Flink Web Dashboard
> of my long-standing Local Flink Cluster I set up above. This makes sense,
> because it doesn't use "-m" to connect to the long-running JobManager.
> > > mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount
> \
> > >     -Dexec.args=" \
> > >          --runner=FlinkRunner \
> > >          --inputFile=pom.xml \
> > >          --output=counts" \
> > >     -Pflink-runner
> >
> >
> > ### My Procedure for Beam on "a long-running Flink Cluster on YARN":
> >
> > Flink on YARN Setup: https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/setup/yarn_setup.html
> >
> > Same as procedure above, except I am on a YARN cluster with HDFS and
> ZooKeeper, etc.
> >
> > As before, I have the right Flink version, the Apache Flink Web
> Dashboard works, I run the org.apache.beam.examples.WordCount command as
> above with the appropriate flinkMaster host:port as identified from the
> JobManager tab of the Apache Flink Web Dashboard.
> >
> > The command runs until it reaches the errors pasted below. The job never
> makes it onto the Apache Flink Web Dashboard, and no output is produced.
> >
> >
> > ### Both Procedures:
> >
> > In both procedures, I tested the accompanying DirectRunner and
> "Flink-local" commands provided in the Beam Quickstart Guide work fine. It
> is only when I attempt to run a job on a long-running Local Flink Cluster
> or long-running Flink Cluster on YARN that the below issues occur.
> >
> >
> > ### The Error:
> > ...
> > Jun 15, 2017 9:20:02 AM org.apache.beam.runners.flink.FlinkRunner run
> > INFO: Starting execution of Flink program.
> > ...
> > INFO: Starting remoting
> > Jun 15, 2017 9:20:03 AM akka.event.slf4j.Slf4jLogger$$
> anonfun$receive$1$$anonfun$applyOrElse$3 apply$mcV$sp
> > INFO: Remoting started; listening on addresses :[akka.tcp://
> [email protected]:54735]
> > Jun 15, 2017 9:20:03 AM org.apache.flink.runtime.client.JobClientActor
> handleMessage
> > INFO: Received SubmitJobAndWait(JobGraph(jobId:
> 18391da12bb10e38134be676e2bc1002)) but there is no connection to a
> JobManager yet.
> > Jun 15, 2017 9:20:03 AM 
> > org.apache.flink.runtime.client.JobSubmissionClientActor
> handleCustomMessage
> > INFO: Received job wordcount-chris0hebert-0615142002-7d71a380 (
> 18391da12bb10e38134be676e2bc1002).
> > Jun 15, 2017 9:20:03 AM org.apache.flink.runtime.client.JobClientActor
> disconnectFromJobManager
> > INFO: Disconnect from JobManager null.
> > Jun 15, 2017 9:20:03 AM akka.event.slf4j.Slf4jLogger$$
> anonfun$receive$1$$anonfun$applyOrElse$2 apply$mcV$sp
> >
> > [**************** THIS LOOKS IMPORTANT ******************]
> >
> > WARNING: Association with remote system [akka.tcp://flink@localhost:6123]
> has failed, address is now gated for [5000] ms. Reason: [Disassociated]
> > Jun 15, 2017 9:21:03 AM org.apache.flink.runtime.client.JobClientActor
> terminate
> > ...
> > Jun 15, 2017 9:21:03 AM org.apache.beam.runners.flink.FlinkRunner run
> > SEVERE: Pipeline execution failed
> > org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Couldn't retrieve the JobExecutionResult from the
> JobManager.
> > at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:427)
> > ...
> > Caused by: org.apache.flink.runtime.client.
> JobClientActorConnectionTimeoutException: Lost connection to the
> JobManager.
> > at org.apache.flink.runtime.client.JobClientActor.
> handleMessage(JobClientActor.java:207)
> > ...
> > [WARNING]
> > java.lang.reflect.InvocationTargetException
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > ...
> > Caused by: org.apache.flink.client.program.ProgramInvocationException:
> The program execution failed: Couldn't retrieve the JobExecutionResult from
> the JobManager.
> > at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:427)
> > ...
> > Caused by: org.apache.flink.runtime.client.JobExecutionException:
> Couldn't retrieve the JobExecutionResult from the JobManager.
> > at org.apache.flink.runtime.client.JobClient.
> awaitJobResult(JobClient.java:294)
> > ...
> > Caused by: org.apache.flink.runtime.client.
> JobClientActorConnectionTimeoutException: Lost connection to the
> JobManager.
> > at org.apache.flink.runtime.client.JobClientActor.
> handleMessage(JobClientActor.java:207)
> > ...
> > [INFO] ------------------------------------------------------------
> ------------
> > [INFO] BUILD FAILURE
> > [INFO] ------------------------------------------------------------
> ------------
> > ...
> > [ERROR] Failed to execute goal 
> > org.codehaus.mojo:exec-maven-plugin:1.4.0:java
> (default-cli) on project word-count-beam: An exception occured while
> executing the Java class. null: InvocationTargetException: Pipeline
> execution failed: The program execution failed: Couldn't retrieve the
> JobExecutionResult from the JobManager. Lost connection to the JobManager.
> -> [Help 1]
> > ...
> >
> > The same error occurs when I run the the Beam WordCount on the Flink
> YARN-Cluster, except obviously my JobManager's address and port is
> different when mentioned in the "WARNING".
> >
> >
> > ### The Ask:
> >
> > What am I missing?
>
>

Reply via email to