Hi Aljoscha,

On Thu, Jul 20, 2017 at 10:31 AM, Aljoscha Krettek <aljos...@apache.org>
 wrote:

> Just for testing, could you try and place your word-count-beam-0.1.jar
> directly in the Flink lib folder and don’t specify it with —filesToStage?
>


I did that and it solved the problem.
Let me elaborate.

First of all to get the wordcount running in combination with HDFS I needed
to change a few things:

I added this to the shade plugin (pom.xml) to avoid neding to specify the
main class

<transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  <mainClass>org.apache.beam.examples.WordCount</mainClass>
</transformer>

and this dependency to get access to HDFS

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
    <version>${beam.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.8.0</version>
</dependency>

and in WordCount.java I changed this

       public interface WordCountOptions extends HadoopFileSystemOptions {

and added this to the main method:

       options.setHdfsConfiguration(Collections.singletonList(new
Configuration()));

after a

              mvn clean package -Pflink-runner
              cp target/word-count-beam-0.1.jar flink-1.2.1/lib/

I ran

./flink-1.2.1/bin/flink run \
    -m yarn-cluster                                         \
    --yarncontainer                 1                       \
    --yarnslots                     4                       \
    --yarnjobManagerMemory          2000                    \
    --yarntaskManagerMemory         2000                    \
    --yarnname "Word count on Beam on Flink on Yarn"        \
    ./flink-1.2.1/lib/word-count-beam-0.1.jar               \
    --runner=FlinkRunner                                    \
    --inputFile=hdfs:///user/nbasjes/helloworld.txt         \
    --output=hdfs:///user/nbasjes/hello-counts


Which succeeded.

Using this I had enough hints to start digging.

I found that my setup is:
1) HDP 2.3.4.0-3485 (which I know is 'no that new')
2) Everything is configured correctly.
Including the environment variables
HADOOP_CONF_DIR=/etc/hadoop/conf/
HBASE_CONF_DIR=/etc/hbase/conf/
HIVE_CONF_DIR=/etc/hive/conf/
YARN_CONF_DIR=/etc/hadoop/conf/

3) Because HBASE_CONF_DIR has been defined (and it should because I need
HBase) the flink script includes the output of 'hbase classpath'  (side
note: I wrote that for Flink a long time ago).
4) In the classpath of the application you now have:
     /usr/hdp/2.3.4.0-3485/hbase/lib/jruby-complete-1.6.8.jar
which contains
     2313  09-14-2010 00:05   org/joda/time/Duration.class



So I tried to put 'just' a newer joda version in the lib directory and keep
the application itself in a 'normal' location.

rm flink-1.2.1/lib/word-count-beam-0.1.jar
cp /usr/hdp/2.3.4.0-3485/hadoop-mapreduce/joda-time-2.9.1.jar
 flink-1.2.1/lib/

hdfs dfs -rm hello-counts*


./flink-1.2.1/bin/flink run \
    -m yarn-cluster                                         \
    --yarncontainer                 1                       \
    --yarnslots                     4                       \
    --yarnjobManagerMemory          2000                    \
    --yarntaskManagerMemory         2000                    \
    --yarnname "Word count on Beam on Flink on Yarn"        \
    ./target/word-count-beam-0.1.jar               \
    --runner=FlinkRunner                                    \
    --inputFile=hdfs:///user/nbasjes/helloworld.txt         \
    --output=hdfs:///user/nbasjes/hello-counts

hdfs dfs -cat hello-counts*


And this works too!

Thanks for the suggestion!

Next step: getting both Kafka and HBase connections running ... :)

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to