Hi, Packaging the flink-hadoop-compatibility dependency with your code into a "fat" job jar should work as well.
Best, Fabian Am Mi., 10. Apr. 2019 um 15:08 Uhr schrieb Morven Huang < morven.hu...@gmail.com>: > Hi, > > > > I’m using Flink 1.5.6 and Hadoop 2.7.1. > > > > *My requirement is to read hdfs sequence file (SequenceFileInputFormat), > then write it back to hdfs (SequenceFileAsBinaryOutputFormat with > compression).* > > > > Below code won’t work until I copy the flink-hadoop-compatibility jar to > FLINK_HOME/lib. I find a similar discussion > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/hadoopcompatibility-not-in-dist-td12552.html, > do we have any update regarding this, or this is still the only way to get > the hadoop compatibility work? > > > > If this is still the only way, do I need to copy that jar to every node of > the cluster? > > > > Or, for my SUPER simple requirement above, is there any other way to go? > For example, without using flink-hadoop-compatibility? > > > > import org.apache.flink.api.common.functions.FlatMapFunction; > > import org.apache.flink.api.common.typeinfo.TypeInformation; > > import org.apache.flink.api.java.ExecutionEnvironment; > > import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; > > import org.apache.flink.api.java.operators.DataSource; > > import org.apache.flink.api.java.operators.FlatMapOperator; > > import org.apache.flink.api.java.tuple.Tuple2; > > import org.apache.flink.api.java.typeutils.TupleTypeInfo; > > import org.apache.flink.hadoopcompatibility.HadoopInputs; > > import org.apache.flink.util.Collector; > > import org.apache.hadoop.fs.Path; > > import org.apache.hadoop.io.BytesWritable; > > import org.apache.hadoop.io.NullWritable; > > import org.apache.hadoop.io.SequenceFile.CompressionType; > > import org.apache.hadoop.mapreduce.Job; > > import > org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat; > > import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; > > > > import com.twitter.chill.protobuf.ProtobufSerializer; > > > > public class Foobar { > > > > @SuppressWarnings("serial") > > public static void main(String[] args) throws Exception { > > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > > > env.getConfig().registerTypeWithKryoSerializer(ProtobufObject.class, > ProtobufSerializer.class); > > > > String path = "hdfs://..."; > > DataSource<Tuple2<NullWritable, BytesWritable>> input = > env.createInput(HadoopInputs.readHadoopFile( > > new > org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat<NullWritable, > BytesWritable>(), > > NullWritable.class, BytesWritable.class, > path), > > new > TupleTypeInfo<>(TypeInformation.of(NullWritable.class), > TypeInformation.of(BytesWritable.class))); > > > > FlatMapOperator<Tuple2<NullWritable, BytesWritable>, > Tuple2<BytesWritable, BytesWritable>> x = input.flatMap( > > new FlatMapFunction<Tuple2<NullWritable, > BytesWritable>, Tuple2<BytesWritable, BytesWritable>>() { > > > > @Override > > public void > flatMap(Tuple2<NullWritable, BytesWritable> value, > > > Collector<Tuple2<BytesWritable, BytesWritable>> out) throws Exception { > > ProtobufObject info = > ProtobufObject.parseFrom(value.f1.copyBytes()); > > String key = > info.getKey(); > > out.collect(new > Tuple2<BytesWritable, BytesWritable>(new BytesWritable(key.getBytes()), > > new > BytesWritable(info.toByteArray()))); > > } > > }); > > > > Job job = Job.getInstance(); > > HadoopOutputFormat<BytesWritable, BytesWritable> hadoopOF > = new HadoopOutputFormat<BytesWritable, BytesWritable>( > > new SequenceFileAsBinaryOutputFormat(), > job); > > > > > hadoopOF.getConfiguration().set("mapreduce.output.fileoutputformat.compress", > "true"); > > > hadoopOF.getConfiguration().set("mapreduce.output.fileoutputformat.compress.type", > CompressionType.BLOCK.toString()); > > TextOutputFormat.setOutputPath(job, new > Path("hdfs://...")); > > > > x.output(hadoopOF); > > env.execute("foo"); > > } > > } >