Hi Niels,

This is unfortunate. I just double checked the Flink distribution jars and they 
don’t contain any Duration.class, so that shouldn’t clash. My only explanation 
is that YARN has some other dependencies in the class path that provide a 
Duration.class that has a different signature.

Just for testing, could you try and place your word-count-beam-0.1.jar directly 
in the Flink lib folder and don’t specify it with —filesToStage?

Best,
Aljoscha

> On 19. Jul 2017, at 14:28, Niels Basjes <ni...@basjes.nl> wrote:
> 
> I double checked the target/word-count-beam-0.1.jar and found that it 
> includes org/joda/time/Duration.class
> which (according to "javap -s Duration.class" ) actually contains
> 
>   public static org.joda.time.Duration millis(long);
>     descriptor: (J)Lorg/joda/time/Duration;
> 
> I'm at a loss where it goes wrong.
> 
> Niels
> 
> 
> On Wed, Jul 19, 2017 at 12:34 PM, Niels Basjes <ni...@basjes.nl 
> <mailto:ni...@basjes.nl>> wrote:
> Hi,
> 
> We have a (Kerberos secured) Yarn cluster on which we run (among lots of 
> others) our Apache Flink applications.
> I'm having trouble getting even the simplest Beam application to run in this 
> setup using Apache Flink as the runner.
> 
> For these applications we effectively run a command similar to this:
> 
> flink run -m yarn-cluster                  \
>      --yarnstreaming                       \
>      --yarncontainer                 1     \
>      --yarnslots                     10     \
>      --yarnjobManagerMemory          3100  \
>      --yarntaskManagerMemory         3100  \
>      --yarnname "My application" \
>      my-application.jar
> 
> In the flink-conf.yaml there are a few settings needed to make everything 
> work (port numbers and such).
> 
> To get started I wanted to simply run the base 'word count' example on our 
> cluster to get the plumbing of running a job right.
> 
> I start with the Java Quickstart ( 
> https://beam.apache.org/get-started/quickstart-java/ 
> <https://beam.apache.org/get-started/quickstart-java/> )
> 
>  mvn archetype:generate \
>       -DarchetypeGroupId=org.apache.beam \
>       -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
>       -DarchetypeVersion=2.0.0 \
>       -DgroupId=org.example \
>       -DartifactId=word-count-beam \
>       -Dversion="0.1" \
>       -Dpackage=org.apache.beam.examples \
>       -DinteractiveMode=false
> 
> To ensure it knows the main class I change this:
> 
> diff --git pom.xml pom.xml
> index bcd9258..201d48f 100644
> --- pom.xml
> +++ pom.xml
> @@ -110,6 +110,9 @@
>                </filters>
>                <transformers>
>                  <transformer 
> implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
> +                <transformer 
> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
> +                  <mainClass>org.apache.beam.examples.WordCount</mainClass>
> +                </transformer>
>                </transformers>
>              </configuration>
>            </execution>
> 
> Now I build this with the flink-runner built in:
> 
> mvn clean package -Pflink-runner
> 
> In the output I see 
>      [INFO] Including org.apache.flink:flink-streaming-java_2.10:jar:1.2.1 in 
> the shaded jar.
> 
> So I get the exact same Flink/Scala version for Hadoop 2.7.x (which we have)
>       
> http://archive.apache.org/dist/flink/flink-1.2.1/flink-1.2.1-bin-hadoop27-scala_2.10.tgz
>  
> <http://archive.apache.org/dist/flink/flink-1.2.1/flink-1.2.1-bin-hadoop27-scala_2.10.tgz>
> 
> I created a helloworld.txt which I put on HDFS
> 
> I then run this using
> 
> ./flink-1.2.1/bin/flink run \
>     -m yarn-cluster \
>     --yarncontainer 4 \
>     --yarnslots                     4     \
>     --yarnjobManagerMemory          2000  \
>     --yarntaskManagerMemory         2000  \
>     --yarnname "Word count on Beam on Flink on Yarn" \
>     target/word-count-beam-0.1.jar \
>     --runner=FlinkRunner \
>     --flinkMaster=[auto] \
>     --filesToStage=target/word-count-beam-0.1.jar \
>     --inputFile=helloworld.txt \
>     --output=hello-counts
> 
> The job gets submitted and fails with 
> 
> Starting execution of program
> 2017-07-19 10:29:45,664 INFO  org.apache.flink.yarn.YarnClusterClient         
>               - Starting program in interactive mode
> 
> ------------------------------------------------------------
>  The program finished with the following exception:
> 
> java.lang.NoSuchMethodError: 
> org.joda.time.Duration.millis(J)Lorg/joda/time/Duration;
>       at org.apache.beam.sdk.util.GcsUtil.<clinit>(GcsUtil.java:146)
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Class.java:264)
>       at com.sun.proxy.$Proxy32.<clinit>(Unknown Source)
>       at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>       at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>       at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>       at 
> org.apache.beam.sdk.util.InstanceBuilder.buildFromConstructor(InstanceBuilder.java:256)
>       at 
> org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:164)
>       at org.apache.beam.sdk.options.ProxyInvocationHandler.as 
> <http://org.apache.beam.sdk.options.proxyinvocationhandler.as/>(ProxyInvocationHandler.java:217)
>       at 
> org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:138)
>       at com.sun.proxy.$Proxy45.as(Unknown Source)
>       at 
> org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystemRegistrar.fromOptions(GcsFileSystemRegistrar.java:44)
>       at org.apache.beam.sdk.io 
> <http://org.apache.beam.sdk.io/>.FileSystems.verifySchemesAreUnique(FileSystems.java:483)
>       at org.apache.beam.sdk.io 
> <http://org.apache.beam.sdk.io/>.FileSystems.setDefaultPipelineOptions(FileSystems.java:473)
>       at 
> org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:44)
>       at org.apache.beam.sdk.Pipeline.create(Pipeline.java:141)
>       at org.apache.beam.examples.WordCount.main(WordCount.java:175)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
>       at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
>       at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
>       at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1079)
>       at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1126)
>       at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1123)
>       at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
>       at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
>       at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1122)
> 2017-07-19 10:29:46,362 INFO  org.apache.flink.yarn.YarnClusterClient         
>               - Sending shutdown request to the Application Master
> 
> 
> Yet during the build I saw 
>      [INFO] Including joda-time:joda-time:jar:2.4 in the shaded jar.
> and 
> 
> $ unzip -l target/word-count-beam-0.1.jar | fgrep org/joda/time/Duration.class
>      5290  2017-07-19 12:21   
> com/google/appengine/repackaged/org/joda/time/Duration.class
>      3748  2017-07-19 12:21   org/joda/time/Duration.class
> 
> What am I doing wrong?
> All help is appreciated!
> 
> Thanks.
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes
> 
> 
> 
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes

Reply via email to