I double checked the target/word-count-beam-0.1.jar and found that it
includes org/joda/time/Duration.class
which (according to "javap -s Duration.class" ) actually contains

  public static org.joda.time.Duration millis(long);
    descriptor: (J)Lorg/joda/time/Duration;

I'm at a loss where it goes wrong.

Niels


On Wed, Jul 19, 2017 at 12:34 PM, Niels Basjes <[email protected]> wrote:

> Hi,
>
> We have a (Kerberos secured) Yarn cluster on which we run (among lots of
> others) our Apache Flink applications.
> I'm having trouble getting even the simplest Beam application to run in
> this setup using Apache Flink as the runner.
>
> For these applications we effectively run a command similar to this:
>
> flink run -m yarn-cluster                  \
>      --yarnstreaming                       \
>      --yarncontainer                 1     \
>      --yarnslots                     10     \
>      --yarnjobManagerMemory          3100  \
>      --yarntaskManagerMemory         3100  \
>      --yarnname "My application" \
>      my-application.jar
>
> In the flink-conf.yaml there are a few settings needed to make everything
> work (port numbers and such).
>
> To get started I wanted to simply run the base 'word count' example on our
> cluster to get the plumbing of running a job right.
>
> I start with the Java Quickstart ( https://beam.apache.org/get-
> started/quickstart-java/ )
>
>  mvn archetype:generate \
>       -DarchetypeGroupId=org.apache.beam \
>       -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
>       -DarchetypeVersion=2.0.0 \
>       -DgroupId=org.example \
>       -DartifactId=word-count-beam \
>       -Dversion="0.1" \
>       -Dpackage=org.apache.beam.examples \
>       -DinteractiveMode=false
>
>
> To ensure it knows the main class I change this:
>
> diff --git pom.xml pom.xml
> index bcd9258..201d48f 100644
> --- pom.xml
> +++ pom.xml
> @@ -110,6 +110,9 @@
>                </filters>
>                <transformers>
>                  <transformer 
> implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
> +                <transformer 
> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
> +                  <mainClass>org.apache.beam.examples.WordCount</mainClass>
> +                </transformer>
>                </transformers>
>              </configuration>
>            </execution>
>
>
> Now I build this with the flink-runner built in:
>
> mvn clean package -Pflink-runner
>
> In the output I see
>      [INFO] Including org.apache.flink:flink-streaming-java_2.10:jar:1.2.1
> in the shaded jar.
>
> So I get the exact same Flink/Scala version for Hadoop 2.7.x (which we
> have)
>       http://archive.apache.org/dist/flink/flink-1.2.1/flink-
> 1.2.1-bin-hadoop27-scala_2.10.tgz
>
> I created a helloworld.txt which I put on HDFS
>
> I then run this using
>
> ./flink-1.2.1/bin/flink run \
>     -m yarn-cluster \
>     --yarncontainer 4 \
>     --yarnslots                     4     \
>     --yarnjobManagerMemory          2000  \
>     --yarntaskManagerMemory         2000  \
>     --yarnname "Word count on Beam on Flink on Yarn" \
>     target/word-count-beam-0.1.jar \
>     --runner=FlinkRunner \
>     --flinkMaster=[auto] \
>     --filesToStage=target/word-count-beam-0.1.jar \
>     --inputFile=helloworld.txt \
>     --output=hello-counts
>
> The job gets submitted and fails with
>
> Starting execution of program
> 2017-07-19 10:29:45,664 INFO  org.apache.flink.yarn.YarnClusterClient
>                   - Starting program in interactive mode
>
> ------------------------------------------------------------
>  The program finished with the following exception:
>
> java.lang.NoSuchMethodError: org.joda.time.Duration.millis(
> J)Lorg/joda/time/Duration;
> at org.apache.beam.sdk.util.GcsUtil.<clinit>(GcsUtil.java:146)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:264)
> at com.sun.proxy.$Proxy32.<clinit>(Unknown Source)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(
> NativeConstructorAccessorImpl.java:62)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at org.apache.beam.sdk.util.InstanceBuilder.buildFromConstructor(
> InstanceBuilder.java:256)
> at org.apache.beam.sdk.util.InstanceBuilder.build(
> InstanceBuilder.java:164)
> at org.apache.beam.sdk.options.ProxyInvocationHandler.as(
> ProxyInvocationHandler.java:217)
> at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(
> ProxyInvocationHandler.java:138)
> at com.sun.proxy.$Proxy45.as(Unknown Source)
> at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystemRegistrar.
> fromOptions(GcsFileSystemRegistrar.java:44)
> at org.apache.beam.sdk.io.FileSystems.verifySchemesAreUnique(
> FileSystems.java:483)
> at org.apache.beam.sdk.io.FileSystems.setDefaultPipelineOptions(
> FileSystems.java:473)
> at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:44)
> at org.apache.beam.sdk.Pipeline.create(Pipeline.java:141)
> at org.apache.beam.examples.WordCount.main(WordCount.java:175)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:528)
> at org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:419)
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:339)
> at org.apache.flink.client.CliFrontend.executeProgram(
> CliFrontend.java:831)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
> at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1079)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1126)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1123)
> at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(
> HadoopSecurityContext.java:43)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1657)
> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(
> HadoopSecurityContext.java:40)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1122)
> 2017-07-19 10:29:46,362 INFO  org.apache.flink.yarn.YarnClusterClient
>                   - Sending shutdown request to the Application Master
>
>
>
> Yet during the build I saw
>      [INFO] Including joda-time:joda-time:jar:2.4 in the shaded jar.
> and
>
> $ unzip -l target/word-count-beam-0.1.jar | fgrep
> org/joda/time/Duration.class
>      5290  2017-07-19 12:21   com/google/appengine/
> repackaged/org/joda/time/Duration.class
>      3748  2017-07-19 12:21   org/joda/time/Duration.class
>
> What am I doing wrong?
> All help is appreciated!
>
> Thanks.
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to