Yes, you should be able to submit jobs to a Flink master from anywhere you
have network connectivity to the Flink master.

It looks like your job is being submitted to the Flink master and we start
waiting for the job to complete but something is causing the job to not
complete successfully. Have you tried looking at the Flink master UI or
Flink master logs?



On Fri, Jul 13, 2018 at 6:45 PM Alice Wong <[email protected]> wrote:

> Hello,
>
> I am a newbie to Beam.
>
> Following the Beam docs, I am trying to submit the example WordCount to a
> Flink cluster (one jobmanger and one taskmanager running locally in two
> linked Docker containers with Maven installed).
>
> It seems the Beam doc is a bit confusing as to how to submit jobs.
>
> In https://beam.apache.org/get-started/quickstart-java/, it mentions I
> should use
>
> mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
>      -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> 
> --filesToStage=target/word-count-beam-bundled-0.1.jar \
>                   --inputFile=/path/to/quickstart/pom.xml 
> --output=/tmp/counts" -Pflink-runner
>
> where <flink master> seems just a hostname.
>
> In https://beam.apache.org/documentation/runners/flink/, it says I should
> use
>
> $ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
>     -Pflink-runner \
>     -Dexec.args="--runner=FlinkRunner \
>       --inputFile=/path/to/pom.xml \
>       --output=/path/to/counts \
>       --flinkMaster=<flink master url> \
>       --filesToStage=target/word-count-beam--bundled-0.1.jar"
>
> where I can give localhost:8081 for flinkMaster.
>
> I have tried run this command both outside Docker and in the jobmanager
> container (with exec command). Either way, if I use "localhost" without
> port for <flink master url>, it just runs locally and ignores flink
> cluster. If I use "localhost:8081", the command hangs for about 5 seconds
> and shows the following error messages. It eventually disconnects and dies.
>
> Could you help give some hint how the Beam jobs are submitted to Flink
> cluster in general? Can I do it outside jobmanager node remotely?
>
> Thanks in advance!
>
> --------------------------------------
> Jul 14, 2018 12:49:47 AM org.apache.beam.runners.flink.FlinkRunner run
> INFO: Executing pipeline using FlinkRunner.
> Jul 14, 2018 12:49:47 AM org.apache.beam.runners.flink.FlinkRunner run
> INFO: Translating pipeline to Flink program.
> Jul 14, 2018 12:49:48 AM
> org.apache.beam.runners.flink.FlinkExecutionEnvironments
> createBatchExecutionEnvironment
> INFO: Creating a Batch Execution Environment.
> Jul 14, 2018 12:49:48 AM
> org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
> enterCompositeTransform
> INFO:  enterCompositeTransform-
> Jul 14, 2018 12:49:48 AM
> org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
> enterCompositeTransform
> INFO: |    enterCompositeTransform- ReadLines
> Jul 14, 2018 12:49:48 AM
> org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
> visitPrimitiveTransform
> INFO: |   |    visitPrimitiveTransform- ReadLines/Read
> Jul 14, 2018 12:49:48 AM
> org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
> leaveCompositeTransform
> INFO: |    leaveCompositeTransform- ReadLines
> Jul 14, 2018 12:49:48 AM
> org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
> enterCompositeTransform
> INFO: |    enterCompositeTransform- WordCount.CountWords
> ...
> INFO: |   |    leaveCompositeTransform- WriteCounts/WriteFiles
> Jul 14, 2018 12:49:48 AM
> org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
> leaveCompositeTransform
> INFO: |    leaveCompositeTransform- WriteCounts
> Jul 14, 2018 12:49:48 AM
> org.apache.beam.runners.flink.FlinkBatchPipelineTranslator
> leaveCompositeTransform
> INFO:  leaveCompositeTransform-
> Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkRunner run
> INFO: Starting execution of Flink program.
> Jul 14, 2018 12:49:49 AM org.apache.flink.api.java.ExecutionEnvironment
> createProgramPlan
> INFO: The job has 0 registered types and 0 default Kryo serializers
> Jul 14, 2018 12:49:49 AM org.apache.beam.sdk.io.FileBasedSource
> getEstimatedSizeBytes
> INFO: Filepattern pom.xml matched 1 files with total size 10600
> Jul 14, 2018 12:49:49 AM
> org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader get
> INFO: Starting client actor system.
> Jul 14, 2018 12:49:49 AM
> org.apache.flink.runtime.util.LeaderRetrievalUtils findConnectingAddress
> INFO: Trying to select the network interface and address to use by
> connecting to the leading JobManager.
> Jul 14, 2018 12:49:49 AM
> org.apache.flink.runtime.util.LeaderRetrievalUtils findConnectingAddress
> INFO: TaskManager will try to connect for 10000 milliseconds before
> falling back to heuristics
> Jul 14, 2018 12:49:49 AM
> org.apache.flink.runtime.net.ConnectionUtils$LeaderConnectingAddressListener
> findConnectingAddress
> INFO: Retrieved new target address localhost/127.0.0.1:8081.
> Jul 14, 2018 12:49:49 AM
> org.apache.flink.runtime.clusterframework.BootstrapTools startActorSystem
> INFO: Trying to start actor system at c4342d15c3f4:0
> Jul 14, 2018 12:49:50 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1
> applyOrElse
> INFO: Slf4jLogger started
> Jul 14, 2018 12:49:50 AM
> akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
> apply$mcV$sp
> INFO: Starting remoting
> Jul 14, 2018 12:49:51 AM
> akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
> apply$mcV$sp
> INFO: Remoting started; listening on addresses
> :[akka.tcp://flink@c4342d15c3f4:46627]
> Jul 14, 2018 12:49:51 AM
> org.apache.flink.runtime.clusterframework.BootstrapTools startActorSystem
> INFO: Actor system started at akka.tcp://flink@c4342d15c3f4:46627
> Jul 14, 2018 12:49:51 AM org.apache.flink.client.program.ClusterClient
> logAndSysout
> INFO: Submitting job with JobID: 87a12b5471d39a7837d6b0def6d748e2. Waiting
> for job completion.
> Submitting job with JobID: 87a12b5471d39a7837d6b0def6d748e2. Waiting for
> job completion.
> Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobClientActor
> handleMessage
> INFO: Received SubmitJobAndWait(JobGraph(jobId:
> 87a12b5471d39a7837d6b0def6d748e2)) but there is no connection to a
> JobManager yet.
> Jul 14, 2018 12:49:51 AM
> org.apache.flink.runtime.client.JobSubmissionClientActor handleCustomMessage
> INFO: Received job wordcount-root-0714004948-3ee44b3d
> (87a12b5471d39a7837d6b0def6d748e2).
> Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobClientActor
> disconnectFromJobManager
> INFO: Disconnect from JobManager null.
> Jul 14, 2018 12:49:51 AM
> akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2
> apply$mcV$sp
> WARNING: Remote connection to [localhost/127.0.0.1:8081] failed with
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
> Adjusted frame length exceeds 10485760: 1213486164 - discarded
> Jul 14, 2018 12:49:51 AM
> akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2
> apply$mcV$sp
> WARNING: Association with remote system [akka.tcp://flink@localhost:8081]
> has failed, address is now gated for [5000] ms. Reason: [Association failed
> with [akka.tcp://flink@localhost:8081]] Caused by: [The remote system
> explicitly disassociated (reason unknown).]
> ...
> Jul 14, 2018 12:50:51 AM
> akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
> apply$mcV$sp
> INFO: Remoting shut down.
> Jul 14, 2018 12:50:51 AM org.apache.beam.runners.flink.FlinkRunner run
> SEVERE: Pipeline execution failed
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Couldn't retrieve the JobExecutionResult from the
> JobManager.
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
> at
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:444)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:419)
> at
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:208)
> at
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:185)
> ...
>

Reply via email to