Perfect, thanks for letting us know!
> On 20. Jul 2017, at 17:25, Niels Basjes <[email protected]> wrote:
>
> Hi Aljoscha,
>
>
> On Thu, Jul 20, 2017 at 10:31 AM, Aljoscha Krettek <[email protected]
> <mailto:[email protected]>> wrote:
> Just for testing, could you try and place your word-count-beam-0.1.jar
> directly in the Flink lib folder and don’t specify it with —filesToStage?
>
>
> I did that and it solved the problem.
> Let me elaborate.
>
> First of all to get the wordcount running in combination with HDFS I needed
> to change a few things:
>
> I added this to the shade plugin (pom.xml) to avoid neding to specify the
> main class
> <transformer
> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
> <mainClass>org.apache.beam.examples.WordCount</mainClass>
> </transformer>
> and this dependency to get access to HDFS
> <dependency>
> <groupId>org.apache.beam</groupId>
> <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
> <version>${beam.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.hadoop</groupId>
> <artifactId>hadoop-client</artifactId>
> <version>2.8.0</version>
> </dependency>
> and in WordCount.java I changed this
> public interface WordCountOptions extends HadoopFileSystemOptions {
> and added this to the main method:
>
> options.setHdfsConfiguration(Collections.singletonList(new
> Configuration()));
>
> after a
>
> mvn clean package -Pflink-runner
> cp target/word-count-beam-0.1.jar flink-1.2.1/lib/
>
> I ran
>
> ./flink-1.2.1/bin/flink run \
> -m yarn-cluster \
> --yarncontainer 1 \
> --yarnslots 4 \
> --yarnjobManagerMemory 2000 \
> --yarntaskManagerMemory 2000 \
> --yarnname "Word count on Beam on Flink on Yarn" \
> ./flink-1.2.1/lib/word-count-beam-0.1.jar \
> --runner=FlinkRunner \
> --inputFile=hdfs:///user/nbasjes/helloworld.txt \
> --output=hdfs:///user/nbasjes/hello-counts
>
> Which succeeded.
>
> Using this I had enough hints to start digging.
>
> I found that my setup is:
> 1) HDP 2.3.4.0-3485 (which I know is 'no that new')
> 2) Everything is configured correctly.
> Including the environment variables
> HADOOP_CONF_DIR=/etc/hadoop/conf/
> HBASE_CONF_DIR=/etc/hbase/conf/
> HIVE_CONF_DIR=/etc/hive/conf/
> YARN_CONF_DIR=/etc/hadoop/conf/
>
> 3) Because HBASE_CONF_DIR has been defined (and it should because I need
> HBase) the flink script includes the output of 'hbase classpath' (side note:
> I wrote that for Flink a long time ago).
> 4) In the classpath of the application you now have:
> /usr/hdp/2.3.4.0-3485/hbase/lib/jruby-complete-1.6.8.jar
> which contains
> 2313 09-14-2010 00:05 org/joda/time/Duration.class
>
>
>
> So I tried to put 'just' a newer joda version in the lib directory and keep
> the application itself in a 'normal' location.
>
> rm flink-1.2.1/lib/word-count-beam-0.1.jar
> cp /usr/hdp/2.3.4.0-3485/hadoop-mapreduce/joda-time-2.9.1.jar
> flink-1.2.1/lib/
> hdfs dfs -rm hello-counts*
>
> ./flink-1.2.1/bin/flink run \
> -m yarn-cluster \
> --yarncontainer 1 \
> --yarnslots 4 \
> --yarnjobManagerMemory 2000 \
> --yarntaskManagerMemory 2000 \
> --yarnname "Word count on Beam on Flink on Yarn" \
> ./target/word-count-beam-0.1.jar \
> --runner=FlinkRunner \
> --inputFile=hdfs:///user/nbasjes/helloworld.txt \
> --output=hdfs:///user/nbasjes/hello-counts
>
> hdfs dfs -cat hello-counts*
>
> And this works too!
>
> Thanks for the suggestion!
>
> Next step: getting both Kafka and HBase connections running ... :)
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes