Hi Fabian,

Packaging that dependency into a fat jar doesn't help, here is the pom.xml
I use, could you please help to take a look if there're some problems?

<project xmlns="http://maven.apache.org/POM/4.0.0";
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
<modelVersion>4.0.0</modelVersion>

<groupId>com.misc.flink</groupId>
<artifactId>foobar</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>foobar</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.5.6</flink.version>
<hadoop.version>2.7.1</hadoop.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>

<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<dependencies>
<!-- Apache Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Different groupId -->
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>3.0.0</version>
</dependency>

<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill-protobuf</artifactId>
<version>0.7.4</version><!-- 0.5.2 -->
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>


<!-- Add connector dependencies here. They must be in the default scope
(compile). -->

<!-- Example: <dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version> </dependency> -->

<!-- Add logging framework, to produce console output when running in the
IDE. -->
<!-- These dependencies are excluded from the application JAR by default.
-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>

<build>
<plugins>

<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>

<!-- We use the maven-shade plugin to create a fat jar that contains all
necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry
point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder. Otherwise,
this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.misc.flink.SinkPbHdfsToHdfs</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>

<pluginManagement>
<plugins>

<!-- This improves the out-of-the-box experience in Eclipse by resolving
some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.0.0,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore />
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore />
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>

<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is
'provided' -->
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>

<activation>
<property>
<name>idea.version</name>
</property>
</activation>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>

</project>


On Wed, Apr 10, 2019 at 9:53 PM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> Packaging the flink-hadoop-compatibility dependency with your code into a
> "fat" job jar should work as well.
>
> Best,
> Fabian
>
> Am Mi., 10. Apr. 2019 um 15:08 Uhr schrieb Morven Huang <
> morven.hu...@gmail.com>:
>
>> Hi,
>>
>>
>>
>> I’m using Flink 1.5.6 and Hadoop 2.7.1.
>>
>>
>>
>> *My requirement is to read hdfs sequence file (SequenceFileInputFormat),
>> then write it back to hdfs (SequenceFileAsBinaryOutputFormat with
>> compression).*
>>
>>
>>
>> Below code won’t work until I copy the flink-hadoop-compatibility jar to
>> FLINK_HOME/lib. I find a similar discussion
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/hadoopcompatibility-not-in-dist-td12552.html,
>> do we have any update regarding this, or this is still the only way to get
>> the hadoop compatibility work?
>>
>>
>>
>> If this is still the only way, do I need to copy that jar to every node
>> of the cluster?
>>
>>
>>
>> Or, for my SUPER simple requirement above, is there any other way to go?
>> For example, without using  flink-hadoop-compatibility?
>>
>>
>>
>> import org.apache.flink.api.common.functions.FlatMapFunction;
>>
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>>
>> import org.apache.flink.api.java.ExecutionEnvironment;
>>
>> import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
>>
>> import org.apache.flink.api.java.operators.DataSource;
>>
>> import org.apache.flink.api.java.operators.FlatMapOperator;
>>
>> import org.apache.flink.api.java.tuple.Tuple2;
>>
>> import org.apache.flink.api.java.typeutils.TupleTypeInfo;
>>
>> import org.apache.flink.hadoopcompatibility.HadoopInputs;
>>
>> import org.apache.flink.util.Collector;
>>
>> import org.apache.hadoop.fs.Path;
>>
>> import org.apache.hadoop.io.BytesWritable;
>>
>> import org.apache.hadoop.io.NullWritable;
>>
>> import org.apache.hadoop.io.SequenceFile.CompressionType;
>>
>> import org.apache.hadoop.mapreduce.Job;
>>
>> import
>> org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat;
>>
>> import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
>>
>>
>>
>> import com.twitter.chill.protobuf.ProtobufSerializer;
>>
>>
>>
>> public class Foobar {
>>
>>
>>
>>         @SuppressWarnings("serial")
>>
>>         public static void main(String[] args) throws Exception {
>>
>>                  ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>
>>
>> env.getConfig().registerTypeWithKryoSerializer(ProtobufObject.class,
>> ProtobufSerializer.class);
>>
>>
>>
>>                  String path = "hdfs://...";
>>
>>                  DataSource<Tuple2<NullWritable, BytesWritable>> input =
>> env.createInput(HadoopInputs.readHadoopFile(
>>
>>                                   new
>> org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat<NullWritable,
>> BytesWritable>(),
>>
>>                                   NullWritable.class,
>> BytesWritable.class, path),
>>
>>                                   new
>> TupleTypeInfo<>(TypeInformation.of(NullWritable.class),
>> TypeInformation.of(BytesWritable.class)));
>>
>>
>>
>>                  FlatMapOperator<Tuple2<NullWritable, BytesWritable>,
>> Tuple2<BytesWritable, BytesWritable>> x = input.flatMap(
>>
>>                                   new
>> FlatMapFunction<Tuple2<NullWritable, BytesWritable>, Tuple2<BytesWritable,
>> BytesWritable>>() {
>>
>>
>>
>>                                           @Override
>>
>>                                           public void
>> flatMap(Tuple2<NullWritable, BytesWritable> value,
>>
>>
>> Collector<Tuple2<BytesWritable, BytesWritable>> out) throws Exception {
>>
>>                                                    ProtobufObject info =
>> ProtobufObject.parseFrom(value.f1.copyBytes());
>>
>>                                                    String key =
>> info.getKey();
>>
>>                                                    out.collect(new
>> Tuple2<BytesWritable, BytesWritable>(new BytesWritable(key.getBytes()),
>>
>>                                                                    new
>> BytesWritable(info.toByteArray())));
>>
>>                                           }
>>
>>                                   });
>>
>>
>>
>>                  Job job = Job.getInstance();
>>
>>                  HadoopOutputFormat<BytesWritable, BytesWritable>
>> hadoopOF = new HadoopOutputFormat<BytesWritable, BytesWritable>(
>>
>>                                   new SequenceFileAsBinaryOutputFormat(),
>> job);
>>
>>
>>
>>
>> hadoopOF.getConfiguration().set("mapreduce.output.fileoutputformat.compress",
>> "true");
>>
>>
>> hadoopOF.getConfiguration().set("mapreduce.output.fileoutputformat.compress.type",
>> CompressionType.BLOCK.toString());
>>
>>                  TextOutputFormat.setOutputPath(job, new
>> Path("hdfs://..."));
>>
>>
>>
>>                  x.output(hadoopOF);
>>
>>                  env.execute("foo");
>>
>>         }
>>
>> }
>>
>

Reply via email to