Hi,

We have a (Kerberos secured) Yarn cluster on which we run (among lots of
others) our Apache Flink applications.
I'm having trouble getting even the simplest Beam application to run in
this setup using Apache Flink as the runner.

For these applications we effectively run a command similar to this:

flink run -m yarn-cluster                  \
     --yarnstreaming                       \
     --yarncontainer                 1     \
     --yarnslots                     10     \
     --yarnjobManagerMemory          3100  \
     --yarntaskManagerMemory         3100  \
     --yarnname "My application" \
     my-application.jar

In the flink-conf.yaml there are a few settings needed to make everything
work (port numbers and such).

To get started I wanted to simply run the base 'word count' example on our
cluster to get the plumbing of running a job right.

I start with the Java Quickstart (
https://beam.apache.org/get-started/quickstart-java/ )

 mvn archetype:generate \
      -DarchetypeGroupId=org.apache.beam \
      -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
      -DarchetypeVersion=2.0.0 \
      -DgroupId=org.example \
      -DartifactId=word-count-beam \
      -Dversion="0.1" \
      -Dpackage=org.apache.beam.examples \
      -DinteractiveMode=false


To ensure it knows the main class I change this:

diff --git pom.xml pom.xml
index bcd9258..201d48f 100644
--- pom.xml
+++ pom.xml
@@ -110,6 +110,9 @@
               </filters>
               <transformers>
                 <transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                  <mainClass>org.apache.beam.examples.WordCount</mainClass>
+                </transformer>
               </transformers>
             </configuration>
           </execution>


Now I build this with the flink-runner built in:

mvn clean package -Pflink-runner

In the output I see
     [INFO] Including org.apache.flink:flink-streaming-java_2.10:jar:1.2.1
in the shaded jar.

So I get the exact same Flink/Scala version for Hadoop 2.7.x (which we have)

http://archive.apache.org/dist/flink/flink-1.2.1/flink-1.2.1-bin-hadoop27-scala_2.10.tgz

I created a helloworld.txt which I put on HDFS

I then run this using

./flink-1.2.1/bin/flink run \
    -m yarn-cluster \
    --yarncontainer 4 \
    --yarnslots                     4     \
    --yarnjobManagerMemory          2000  \
    --yarntaskManagerMemory         2000  \
    --yarnname "Word count on Beam on Flink on Yarn" \
    target/word-count-beam-0.1.jar \
    --runner=FlinkRunner \
    --flinkMaster=[auto] \
    --filesToStage=target/word-count-beam-0.1.jar \
    --inputFile=helloworld.txt \
    --output=hello-counts

The job gets submitted and fails with

Starting execution of program
2017-07-19 10:29:45,664 INFO  org.apache.flink.yarn.YarnClusterClient
                - Starting program in interactive mode

------------------------------------------------------------
 The program finished with the following exception:

java.lang.NoSuchMethodError:
org.joda.time.Duration.millis(J)Lorg/joda/time/Duration;
at org.apache.beam.sdk.util.GcsUtil.<clinit>(GcsUtil.java:146)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at com.sun.proxy.$Proxy32.<clinit>(Unknown Source)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.beam.sdk.util.InstanceBuilder.buildFromConstructor(InstanceBuilder.java:256)
at org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:164)
at org.apache.beam.sdk.options.ProxyInvocationHandler.as
(ProxyInvocationHandler.java:217)
at
org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:138)
at com.sun.proxy.$Proxy45.as(Unknown Source)
at
org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystemRegistrar.fromOptions(GcsFileSystemRegistrar.java:44)
at
org.apache.beam.sdk.io.FileSystems.verifySchemesAreUnique(FileSystems.java:483)
at
org.apache.beam.sdk.io.FileSystems.setDefaultPipelineOptions(FileSystems.java:473)
at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:44)
at org.apache.beam.sdk.Pipeline.create(Pipeline.java:141)
at org.apache.beam.examples.WordCount.main(WordCount.java:175)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1079)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1126)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1123)
at
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1122)
2017-07-19 10:29:46,362 INFO  org.apache.flink.yarn.YarnClusterClient
                - Sending shutdown request to the Application Master



Yet during the build I saw
     [INFO] Including joda-time:joda-time:jar:2.4 in the shaded jar.
and

$ unzip -l target/word-count-beam-0.1.jar | fgrep
org/joda/time/Duration.class
     5290  2017-07-19 12:21
com/google/appengine/repackaged/org/joda/time/Duration.class
     3748  2017-07-19 12:21   org/joda/time/Duration.class

What am I doing wrong?
All help is appreciated!

Thanks.

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to