Hi Chris, I just followed your process myself (getting Flink 1.2.1, starting in local cluster mode, running Beam word-count Quickstart on cluster) and everything worked for me. Could you double check whether the JobManager is reachable under the expected address?
On another note, you can also run Beam jobs on Flink with the usual bin/flink tool, i.e. to submit as a one-job YARN session or to submit to a running YARN Flink cluster: bin/flink run -c main-class path/to/jar.jar <program arguments> Where <program arguments> would be exactly the same arguments that you used before. Best, Aljoscha > On 15. Jun 2017, at 17:44, Chris Hebert > <[email protected]> wrote: > > Hi, > > The error is pasted below my procedure. > > > ### My Procedure for Beam on "a long-running Local Flink Cluster": > > Beam WordCount Quickstart: > https://beam.apache.org/get-started/quickstart-java/ > > Run: > > cd /user/me/beam > > mvn archetype:generate \ > > -DarchetypeGroupId=org.apache.beam \ > > -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ > > -DarchetypeVersion=2.0.0 \ > > -DgroupId=org.example \ > > -DartifactId=word-count-beam \ > > -Dversion="0.1" \ > > -Dpackage=org.apache.beam.examples \ > > -DinteractiveMode=false > > Beam on FlinkRunner Guide: > https://beam.apache.org/documentation/runners/flink/ > > Navigate into word-count-beam and identify the appropriate Flink version to > be 1.2.1: > > cd word-count-beam > > mvn dependency:tree -Pflink-runner | grep flink > > Local Flink Cluster Quickstart Guide: > https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/setup_quickstart.html > > Follow the Local Flink Cluster Quickstart Guide. The "Apache Flink Web > Dashboard" opens in a browser showing jobs successfully running and completed > as I submit them. I keep this running. > > Back in /user/me/beam/word-count-beam/, run: > > mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ > > -Dexec.args=" \ > > --runner=FlinkRunner \ > > --flinkMaster=localhost:6123 \ > > --filesToStage=target/word-count-beam-0.1.jar \ > > --inputFile=/user/me/beam/word-count-beam/pom.xml \ > > --output=/user/me/beam/word-count-beam/output_01" \ > > -Pflink-runner > > The flinkMaster host:port is identified in the JobManager tab of the Apache > Flink Web Dashboard. Note that the Beam guide says to use > "--filesToStage=target/word-count-beam-bundled-0.1.jar", but Maven actually > only builds "target/word-count-beam-0.1.jar". > > The above command runs until it reaches the errors pasted below. The job > never makes it onto the Apache Flink Web Dashboard, and no output is produced. > > Note that the following command (under the "Flink-local" tab on the Beam > Quickstart Guide) works fine, but it starts it's own instance of a Local > Flink Cluster. The job never makes it onto the Apache Flink Web Dashboard of > my long-standing Local Flink Cluster I set up above. This makes sense, > because it doesn't use "-m" to connect to the long-running JobManager. > > mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ > > -Dexec.args=" \ > > --runner=FlinkRunner \ > > --inputFile=pom.xml \ > > --output=counts" \ > > -Pflink-runner > > > ### My Procedure for Beam on "a long-running Flink Cluster on YARN": > > Flink on YARN Setup: > https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/yarn_setup.html > > Same as procedure above, except I am on a YARN cluster with HDFS and > ZooKeeper, etc. > > As before, I have the right Flink version, the Apache Flink Web Dashboard > works, I run the org.apache.beam.examples.WordCount command as above with the > appropriate flinkMaster host:port as identified from the JobManager tab of > the Apache Flink Web Dashboard. > > The command runs until it reaches the errors pasted below. The job never > makes it onto the Apache Flink Web Dashboard, and no output is produced. > > > ### Both Procedures: > > In both procedures, I tested the accompanying DirectRunner and "Flink-local" > commands provided in the Beam Quickstart Guide work fine. It is only when I > attempt to run a job on a long-running Local Flink Cluster or long-running > Flink Cluster on YARN that the below issues occur. > > > ### The Error: > ... > Jun 15, 2017 9:20:02 AM org.apache.beam.runners.flink.FlinkRunner run > INFO: Starting execution of Flink program. > ... > INFO: Starting remoting > Jun 15, 2017 9:20:03 AM > akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 > apply$mcV$sp > INFO: Remoting started; listening on addresses > :[akka.tcp://[email protected]:54735] > Jun 15, 2017 9:20:03 AM org.apache.flink.runtime.client.JobClientActor > handleMessage > INFO: Received SubmitJobAndWait(JobGraph(jobId: > 18391da12bb10e38134be676e2bc1002)) but there is no connection to a JobManager > yet. > Jun 15, 2017 9:20:03 AM > org.apache.flink.runtime.client.JobSubmissionClientActor handleCustomMessage > INFO: Received job wordcount-chris0hebert-0615142002-7d71a380 > (18391da12bb10e38134be676e2bc1002). > Jun 15, 2017 9:20:03 AM org.apache.flink.runtime.client.JobClientActor > disconnectFromJobManager > INFO: Disconnect from JobManager null. > Jun 15, 2017 9:20:03 AM > akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2 > apply$mcV$sp > > [**************** THIS LOOKS IMPORTANT ******************] > > WARNING: Association with remote system [akka.tcp://flink@localhost:6123] has > failed, address is now gated for [5000] ms. Reason: [Disassociated] > Jun 15, 2017 9:21:03 AM org.apache.flink.runtime.client.JobClientActor > terminate > ... > Jun 15, 2017 9:21:03 AM org.apache.beam.runners.flink.FlinkRunner run > SEVERE: Pipeline execution failed > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Couldn't retrieve the JobExecutionResult from the > JobManager. > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > ... > Caused by: > org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: > Lost connection to the JobManager. > at > org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:207) > ... > [WARNING] > java.lang.reflect.InvocationTargetException > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ... > Caused by: org.apache.flink.client.program.ProgramInvocationException: The > program execution failed: Couldn't retrieve the JobExecutionResult from the > JobManager. > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > ... > Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't > retrieve the JobExecutionResult from the JobManager. > at > org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:294) > ... > Caused by: > org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: > Lost connection to the JobManager. > at > org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:207) > ... > [INFO] > ------------------------------------------------------------------------ > [INFO] BUILD FAILURE > [INFO] > ------------------------------------------------------------------------ > ... > [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.4.0:java > (default-cli) on project word-count-beam: An exception occured while > executing the Java class. null: InvocationTargetException: Pipeline execution > failed: The program execution failed: Couldn't retrieve the > JobExecutionResult from the JobManager. Lost connection to the JobManager. -> > [Help 1] > ... > > The same error occurs when I run the the Beam WordCount on the Flink > YARN-Cluster, except obviously my JobManager's address and port is different > when mentioned in the "WARNING". > > > ### The Ask: > > What am I missing?
