Hi,

Packaging the flink-hadoop-compatibility dependency with your code into a
"fat" job jar should work as well.

Best,
Fabian

Am Mi., 10. Apr. 2019 um 15:08 Uhr schrieb Morven Huang <
morven.hu...@gmail.com>:

> Hi,
>
>
>
> I’m using Flink 1.5.6 and Hadoop 2.7.1.
>
>
>
> *My requirement is to read hdfs sequence file (SequenceFileInputFormat),
> then write it back to hdfs (SequenceFileAsBinaryOutputFormat with
> compression).*
>
>
>
> Below code won’t work until I copy the flink-hadoop-compatibility jar to
> FLINK_HOME/lib. I find a similar discussion
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/hadoopcompatibility-not-in-dist-td12552.html,
> do we have any update regarding this, or this is still the only way to get
> the hadoop compatibility work?
>
>
>
> If this is still the only way, do I need to copy that jar to every node of
> the cluster?
>
>
>
> Or, for my SUPER simple requirement above, is there any other way to go?
> For example, without using  flink-hadoop-compatibility?
>
>
>
> import org.apache.flink.api.common.functions.FlatMapFunction;
>
> import org.apache.flink.api.common.typeinfo.TypeInformation;
>
> import org.apache.flink.api.java.ExecutionEnvironment;
>
> import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
>
> import org.apache.flink.api.java.operators.DataSource;
>
> import org.apache.flink.api.java.operators.FlatMapOperator;
>
> import org.apache.flink.api.java.tuple.Tuple2;
>
> import org.apache.flink.api.java.typeutils.TupleTypeInfo;
>
> import org.apache.flink.hadoopcompatibility.HadoopInputs;
>
> import org.apache.flink.util.Collector;
>
> import org.apache.hadoop.fs.Path;
>
> import org.apache.hadoop.io.BytesWritable;
>
> import org.apache.hadoop.io.NullWritable;
>
> import org.apache.hadoop.io.SequenceFile.CompressionType;
>
> import org.apache.hadoop.mapreduce.Job;
>
> import
> org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat;
>
> import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
>
>
>
> import com.twitter.chill.protobuf.ProtobufSerializer;
>
>
>
> public class Foobar {
>
>
>
>         @SuppressWarnings("serial")
>
>         public static void main(String[] args) throws Exception {
>
>                  ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
>
> env.getConfig().registerTypeWithKryoSerializer(ProtobufObject.class,
> ProtobufSerializer.class);
>
>
>
>                  String path = "hdfs://...";
>
>                  DataSource<Tuple2<NullWritable, BytesWritable>> input =
> env.createInput(HadoopInputs.readHadoopFile(
>
>                                   new
> org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat<NullWritable,
> BytesWritable>(),
>
>                                   NullWritable.class, BytesWritable.class,
> path),
>
>                                   new
> TupleTypeInfo<>(TypeInformation.of(NullWritable.class),
> TypeInformation.of(BytesWritable.class)));
>
>
>
>                  FlatMapOperator<Tuple2<NullWritable, BytesWritable>,
> Tuple2<BytesWritable, BytesWritable>> x = input.flatMap(
>
>                                   new FlatMapFunction<Tuple2<NullWritable,
> BytesWritable>, Tuple2<BytesWritable, BytesWritable>>() {
>
>
>
>                                           @Override
>
>                                           public void
> flatMap(Tuple2<NullWritable, BytesWritable> value,
>
>
> Collector<Tuple2<BytesWritable, BytesWritable>> out) throws Exception {
>
>                                                    ProtobufObject info =
> ProtobufObject.parseFrom(value.f1.copyBytes());
>
>                                                    String key =
> info.getKey();
>
>                                                    out.collect(new
> Tuple2<BytesWritable, BytesWritable>(new BytesWritable(key.getBytes()),
>
>                                                                    new
> BytesWritable(info.toByteArray())));
>
>                                           }
>
>                                   });
>
>
>
>                  Job job = Job.getInstance();
>
>                  HadoopOutputFormat<BytesWritable, BytesWritable> hadoopOF
> = new HadoopOutputFormat<BytesWritable, BytesWritable>(
>
>                                   new SequenceFileAsBinaryOutputFormat(),
> job);
>
>
>
>
> hadoopOF.getConfiguration().set("mapreduce.output.fileoutputformat.compress",
> "true");
>
>
> hadoopOF.getConfiguration().set("mapreduce.output.fileoutputformat.compress.type",
> CompressionType.BLOCK.toString());
>
>                  TextOutputFormat.setOutputPath(job, new
> Path("hdfs://..."));
>
>
>
>                  x.output(hadoopOF);
>
>                  env.execute("foo");
>
>         }
>
> }
>

Reply via email to