Alice,

Totally agree with what Lukasz said. 
Also, as alternative solution for job testing, I can suggest to install Flink 
locally and run Beam pipeline with just CLI command like this “bin/flink run -c 
<main_class> /path/to/jar --runner=FlinkRunner “.

> On 18 Jul 2018, at 17:09, Lukasz Cwik <[email protected]> wrote:
> 
> Yes, you should be able to submit jobs to a Flink master from anywhere you 
> have network connectivity to the Flink master.
> 
> It looks like your job is being submitted to the Flink master and we start 
> waiting for the job to complete but something is causing the job to not 
> complete successfully. Have you tried looking at the Flink master UI or Flink 
> master logs?
> 
> 
> 
> On Fri, Jul 13, 2018 at 6:45 PM Alice Wong <[email protected] 
> <mailto:[email protected]>> wrote:
> Hello,
> 
> I am a newbie to Beam.
> 
> Following the Beam docs, I am trying to submit the example WordCount to a 
> Flink cluster (one jobmanger and one taskmanager running locally in two 
> linked Docker containers with Maven installed).
> 
> It seems the Beam doc is a bit confusing as to how to submit jobs.
> 
> In https://beam.apache.org/get-started/quickstart-java/ 
> <https://beam.apache.org/get-started/quickstart-java/>, it mentions I should 
> use
> 
> mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
>      -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> 
> --filesToStage=target/word-count-beam-bundled-0.1.jar \
>                   --inputFile=/path/to/quickstart/pom.xml 
> --output=/tmp/counts" -Pflink-runner
> where <flink master> seems just a hostname.
> 
> In https://beam.apache.org/documentation/runners/flink/ 
> <https://beam.apache.org/documentation/runners/flink/>, it says I should use
> 
> $ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
>     -Pflink-runner \
>     -Dexec.args="--runner=FlinkRunner \
>       --inputFile=/path/to/pom.xml \
>       --output=/path/to/counts \
>       --flinkMaster=<flink master url> \
>       --filesToStage=target/word-count-beam--bundled-0.1.jar"
> where I can give localhost:8081 for flinkMaster.
> 
> I have tried run this command both outside Docker and in the jobmanager 
> container (with exec command). Either way, if I use "localhost" without port 
> for <flink master url>, it just runs locally and ignores flink cluster. If I 
> use "localhost:8081", the command hangs for about 5 seconds and shows the 
> following error messages. It eventually disconnects and dies.
> 
> Could you help give some hint how the Beam jobs are submitted to Flink 
> cluster in general? Can I do it outside jobmanager node remotely?
> 
> Thanks in advance!
> 
> --------------------------------------
> Jul 14, 2018 12:49:47 AM org.apache.beam.runners.flink.FlinkRunner run
> INFO: Executing pipeline using FlinkRunner.
> Jul 14, 2018 12:49:47 AM org.apache.beam.runners.flink.FlinkRunner run
> INFO: Translating pipeline to Flink program.
> Jul 14, 2018 12:49:48 AM 
> org.apache.beam.runners.flink.FlinkExecutionEnvironments 
> createBatchExecutionEnvironment
> INFO: Creating a Batch Execution Environment.
> Jul 14, 2018 12:49:48 AM 
> org.apache.beam.runners.flink.FlinkBatchPipelineTranslator 
> enterCompositeTransform
> INFO:  enterCompositeTransform-
> Jul 14, 2018 12:49:48 AM 
> org.apache.beam.runners.flink.FlinkBatchPipelineTranslator 
> enterCompositeTransform
> INFO: |    enterCompositeTransform- ReadLines
> Jul 14, 2018 12:49:48 AM 
> org.apache.beam.runners.flink.FlinkBatchPipelineTranslator 
> visitPrimitiveTransform
> INFO: |   |    visitPrimitiveTransform- ReadLines/Read
> Jul 14, 2018 12:49:48 AM 
> org.apache.beam.runners.flink.FlinkBatchPipelineTranslator 
> leaveCompositeTransform
> INFO: |    leaveCompositeTransform- ReadLines
> Jul 14, 2018 12:49:48 AM 
> org.apache.beam.runners.flink.FlinkBatchPipelineTranslator 
> enterCompositeTransform
> INFO: |    enterCompositeTransform- WordCount.CountWords
> ...
> INFO: |   |    leaveCompositeTransform- WriteCounts/WriteFiles
> Jul 14, 2018 12:49:48 AM 
> org.apache.beam.runners.flink.FlinkBatchPipelineTranslator 
> leaveCompositeTransform
> INFO: |    leaveCompositeTransform- WriteCounts
> Jul 14, 2018 12:49:48 AM 
> org.apache.beam.runners.flink.FlinkBatchPipelineTranslator 
> leaveCompositeTransform
> INFO:  leaveCompositeTransform-
> Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkRunner run
> INFO: Starting execution of Flink program.
> Jul 14, 2018 12:49:49 AM org.apache.flink.api.java.ExecutionEnvironment 
> createProgramPlan
> INFO: The job has 0 registered types and 0 default Kryo serializers
> Jul 14, 2018 12:49:49 AM org.apache.beam.sdk.io.FileBasedSource 
> getEstimatedSizeBytes
> INFO: Filepattern pom.xml matched 1 files with total size 10600
> Jul 14, 2018 12:49:49 AM 
> org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader get
> INFO: Starting client actor system.
> Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.util.LeaderRetrievalUtils 
> findConnectingAddress
> INFO: Trying to select the network interface and address to use by connecting 
> to the leading JobManager.
> Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.util.LeaderRetrievalUtils 
> findConnectingAddress
> INFO: TaskManager will try to connect for 10000 milliseconds before falling 
> back to heuristics
> Jul 14, 2018 12:49:49 AM 
> org.apache.flink.runtime.net.ConnectionUtils$LeaderConnectingAddressListener 
> findConnectingAddress
> INFO: Retrieved new target address localhost/127.0.0.1:8081 
> <http://127.0.0.1:8081/>.
> Jul 14, 2018 12:49:49 AM 
> org.apache.flink.runtime.clusterframework.BootstrapTools startActorSystem
> INFO: Trying to start actor system at c4342d15c3f4:0
> Jul 14, 2018 12:49:50 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 
> applyOrElse
> INFO: Slf4jLogger started
> Jul 14, 2018 12:49:50 AM 
> akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 
> apply$mcV$sp
> INFO: Starting remoting
> Jul 14, 2018 12:49:51 AM 
> akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 
> apply$mcV$sp
> INFO: Remoting started; listening on addresses 
> :[akka.tcp://flink@c4342d15c3f4:46627]
> Jul 14, 2018 12:49:51 AM 
> org.apache.flink.runtime.clusterframework.BootstrapTools startActorSystem
> INFO: Actor system started at akka.tcp://flink@c4342d15c3f4:46627
> Jul 14, 2018 12:49:51 AM org.apache.flink.client.program.ClusterClient 
> logAndSysout
> INFO: Submitting job with JobID: 87a12b5471d39a7837d6b0def6d748e2. Waiting 
> for job completion.
> Submitting job with JobID: 87a12b5471d39a7837d6b0def6d748e2. Waiting for job 
> completion.
> Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobClientActor 
> handleMessage
> INFO: Received SubmitJobAndWait(JobGraph(jobId: 
> 87a12b5471d39a7837d6b0def6d748e2)) but there is no connection to a JobManager 
> yet.
> Jul 14, 2018 12:49:51 AM 
> org.apache.flink.runtime.client.JobSubmissionClientActor handleCustomMessage
> INFO: Received job wordcount-root-0714004948-3ee44b3d 
> (87a12b5471d39a7837d6b0def6d748e2).
> Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobClientActor 
> disconnectFromJobManager
> INFO: Disconnect from JobManager null.
> Jul 14, 2018 12:49:51 AM 
> akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2 
> apply$mcV$sp
> WARNING: Remote connection to [localhost/127.0.0.1:8081 
> <http://127.0.0.1:8081/>] failed with 
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
>  Adjusted frame length exceeds 10485760: 1213486164 - discarded
> Jul 14, 2018 12:49:51 AM 
> akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2 
> apply$mcV$sp
> WARNING: Association with remote system [akka.tcp://flink@localhost:8081] has 
> failed, address is now gated for [5000] ms. Reason: [Association failed with 
> [akka.tcp://flink@localhost:8081]] Caused by: [The remote system explicitly 
> disassociated (reason unknown).]
> ...
> Jul 14, 2018 12:50:51 AM 
> akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 
> apply$mcV$sp
> INFO: Remoting shut down.
> Jul 14, 2018 12:50:51 AM org.apache.beam.runners.flink.FlinkRunner run
> SEVERE: Pipeline execution failed
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Couldn't retrieve the JobExecutionResult from the 
> JobManager.
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
>       at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:444)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:419)
>       at 
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:208)
>       at 
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:185)
> ...

Reply via email to