I double checked the target/word-count-beam-0.1.jar and found that it
includes org/joda/time/Duration.class
which (according to "javap -s Duration.class" ) actually contains
public static org.joda.time.Duration millis(long);
descriptor: (J)Lorg/joda/time/Duration;
I'm at a loss where it goes wrong.
Niels
On Wed, Jul 19, 2017 at 12:34 PM, Niels Basjes <[email protected]> wrote:
> Hi,
>
> We have a (Kerberos secured) Yarn cluster on which we run (among lots of
> others) our Apache Flink applications.
> I'm having trouble getting even the simplest Beam application to run in
> this setup using Apache Flink as the runner.
>
> For these applications we effectively run a command similar to this:
>
> flink run -m yarn-cluster \
> --yarnstreaming \
> --yarncontainer 1 \
> --yarnslots 10 \
> --yarnjobManagerMemory 3100 \
> --yarntaskManagerMemory 3100 \
> --yarnname "My application" \
> my-application.jar
>
> In the flink-conf.yaml there are a few settings needed to make everything
> work (port numbers and such).
>
> To get started I wanted to simply run the base 'word count' example on our
> cluster to get the plumbing of running a job right.
>
> I start with the Java Quickstart ( https://beam.apache.org/get-
> started/quickstart-java/ )
>
> mvn archetype:generate \
> -DarchetypeGroupId=org.apache.beam \
> -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
> -DarchetypeVersion=2.0.0 \
> -DgroupId=org.example \
> -DartifactId=word-count-beam \
> -Dversion="0.1" \
> -Dpackage=org.apache.beam.examples \
> -DinteractiveMode=false
>
>
> To ensure it knows the main class I change this:
>
> diff --git pom.xml pom.xml
> index bcd9258..201d48f 100644
> --- pom.xml
> +++ pom.xml
> @@ -110,6 +110,9 @@
> </filters>
> <transformers>
> <transformer
> implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
> + <transformer
> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
> + <mainClass>org.apache.beam.examples.WordCount</mainClass>
> + </transformer>
> </transformers>
> </configuration>
> </execution>
>
>
> Now I build this with the flink-runner built in:
>
> mvn clean package -Pflink-runner
>
> In the output I see
> [INFO] Including org.apache.flink:flink-streaming-java_2.10:jar:1.2.1
> in the shaded jar.
>
> So I get the exact same Flink/Scala version for Hadoop 2.7.x (which we
> have)
> http://archive.apache.org/dist/flink/flink-1.2.1/flink-
> 1.2.1-bin-hadoop27-scala_2.10.tgz
>
> I created a helloworld.txt which I put on HDFS
>
> I then run this using
>
> ./flink-1.2.1/bin/flink run \
> -m yarn-cluster \
> --yarncontainer 4 \
> --yarnslots 4 \
> --yarnjobManagerMemory 2000 \
> --yarntaskManagerMemory 2000 \
> --yarnname "Word count on Beam on Flink on Yarn" \
> target/word-count-beam-0.1.jar \
> --runner=FlinkRunner \
> --flinkMaster=[auto] \
> --filesToStage=target/word-count-beam-0.1.jar \
> --inputFile=helloworld.txt \
> --output=hello-counts
>
> The job gets submitted and fails with
>
> Starting execution of program
> 2017-07-19 10:29:45,664 INFO org.apache.flink.yarn.YarnClusterClient
> - Starting program in interactive mode
>
> ------------------------------------------------------------
> The program finished with the following exception:
>
> java.lang.NoSuchMethodError: org.joda.time.Duration.millis(
> J)Lorg/joda/time/Duration;
> at org.apache.beam.sdk.util.GcsUtil.<clinit>(GcsUtil.java:146)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:264)
> at com.sun.proxy.$Proxy32.<clinit>(Unknown Source)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(
> NativeConstructorAccessorImpl.java:62)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at org.apache.beam.sdk.util.InstanceBuilder.buildFromConstructor(
> InstanceBuilder.java:256)
> at org.apache.beam.sdk.util.InstanceBuilder.build(
> InstanceBuilder.java:164)
> at org.apache.beam.sdk.options.ProxyInvocationHandler.as(
> ProxyInvocationHandler.java:217)
> at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(
> ProxyInvocationHandler.java:138)
> at com.sun.proxy.$Proxy45.as(Unknown Source)
> at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystemRegistrar.
> fromOptions(GcsFileSystemRegistrar.java:44)
> at org.apache.beam.sdk.io.FileSystems.verifySchemesAreUnique(
> FileSystems.java:483)
> at org.apache.beam.sdk.io.FileSystems.setDefaultPipelineOptions(
> FileSystems.java:473)
> at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:44)
> at org.apache.beam.sdk.Pipeline.create(Pipeline.java:141)
> at org.apache.beam.examples.WordCount.main(WordCount.java:175)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:528)
> at org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:419)
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:339)
> at org.apache.flink.client.CliFrontend.executeProgram(
> CliFrontend.java:831)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
> at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1079)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1126)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1123)
> at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(
> HadoopSecurityContext.java:43)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1657)
> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(
> HadoopSecurityContext.java:40)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1122)
> 2017-07-19 10:29:46,362 INFO org.apache.flink.yarn.YarnClusterClient
> - Sending shutdown request to the Application Master
>
>
>
> Yet during the build I saw
> [INFO] Including joda-time:joda-time:jar:2.4 in the shaded jar.
> and
>
> $ unzip -l target/word-count-beam-0.1.jar | fgrep
> org/joda/time/Duration.class
> 5290 2017-07-19 12:21 com/google/appengine/
> repackaged/org/joda/time/Duration.class
> 3748 2017-07-19 12:21 org/joda/time/Duration.class
>
> What am I doing wrong?
> All help is appreciated!
>
> Thanks.
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>
>
--
Best regards / Met vriendelijke groeten,
Niels Basjes