Hi,
We have a (Kerberos secured) Yarn cluster on which we run (among lots of
others) our Apache Flink applications.
I'm having trouble getting even the simplest Beam application to run in
this setup using Apache Flink as the runner.
For these applications we effectively run a command similar to this:
flink run -m yarn-cluster \
--yarnstreaming \
--yarncontainer 1 \
--yarnslots 10 \
--yarnjobManagerMemory 3100 \
--yarntaskManagerMemory 3100 \
--yarnname "My application" \
my-application.jar
In the flink-conf.yaml there are a few settings needed to make everything
work (port numbers and such).
To get started I wanted to simply run the base 'word count' example on our
cluster to get the plumbing of running a job right.
I start with the Java Quickstart (
https://beam.apache.org/get-started/quickstart-java/ )
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.0.0 \
-DgroupId=org.example \
-DartifactId=word-count-beam \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
To ensure it knows the main class I change this:
diff --git pom.xml pom.xml
index bcd9258..201d48f 100644
--- pom.xml
+++ pom.xml
@@ -110,6 +110,9 @@
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.beam.examples.WordCount</mainClass>
+ </transformer>
</transformers>
</configuration>
</execution>
Now I build this with the flink-runner built in:
mvn clean package -Pflink-runner
In the output I see
[INFO] Including org.apache.flink:flink-streaming-java_2.10:jar:1.2.1
in the shaded jar.
So I get the exact same Flink/Scala version for Hadoop 2.7.x (which we have)
http://archive.apache.org/dist/flink/flink-1.2.1/flink-1.2.1-bin-hadoop27-scala_2.10.tgz
I created a helloworld.txt which I put on HDFS
I then run this using
./flink-1.2.1/bin/flink run \
-m yarn-cluster \
--yarncontainer 4 \
--yarnslots 4 \
--yarnjobManagerMemory 2000 \
--yarntaskManagerMemory 2000 \
--yarnname "Word count on Beam on Flink on Yarn" \
target/word-count-beam-0.1.jar \
--runner=FlinkRunner \
--flinkMaster=[auto] \
--filesToStage=target/word-count-beam-0.1.jar \
--inputFile=helloworld.txt \
--output=hello-counts
The job gets submitted and fails with
Starting execution of program
2017-07-19 10:29:45,664 INFO org.apache.flink.yarn.YarnClusterClient
- Starting program in interactive mode
------------------------------------------------------------
The program finished with the following exception:
java.lang.NoSuchMethodError:
org.joda.time.Duration.millis(J)Lorg/joda/time/Duration;
at org.apache.beam.sdk.util.GcsUtil.<clinit>(GcsUtil.java:146)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at com.sun.proxy.$Proxy32.<clinit>(Unknown Source)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.beam.sdk.util.InstanceBuilder.buildFromConstructor(InstanceBuilder.java:256)
at org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:164)
at org.apache.beam.sdk.options.ProxyInvocationHandler.as
(ProxyInvocationHandler.java:217)
at
org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:138)
at com.sun.proxy.$Proxy45.as(Unknown Source)
at
org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystemRegistrar.fromOptions(GcsFileSystemRegistrar.java:44)
at
org.apache.beam.sdk.io.FileSystems.verifySchemesAreUnique(FileSystems.java:483)
at
org.apache.beam.sdk.io.FileSystems.setDefaultPipelineOptions(FileSystems.java:473)
at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:44)
at org.apache.beam.sdk.Pipeline.create(Pipeline.java:141)
at org.apache.beam.examples.WordCount.main(WordCount.java:175)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1079)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1126)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1123)
at
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1122)
2017-07-19 10:29:46,362 INFO org.apache.flink.yarn.YarnClusterClient
- Sending shutdown request to the Application Master
Yet during the build I saw
[INFO] Including joda-time:joda-time:jar:2.4 in the shaded jar.
and
$ unzip -l target/word-count-beam-0.1.jar | fgrep
org/joda/time/Duration.class
5290 2017-07-19 12:21
com/google/appengine/repackaged/org/joda/time/Duration.class
3748 2017-07-19 12:21 org/joda/time/Duration.class
What am I doing wrong?
All help is appreciated!
Thanks.
--
Best regards / Met vriendelijke groeten,
Niels Basjes