Hi, 

I’m using Flink 1.5.6 and Hadoop 2.7.1. 

My requirement is to read hdfs sequence file (SequenceFileInputFormat), then 
write it back to hdfs (SequenceFileAsBinaryOutputFormat with compression).

Below code won’t work until I copy the flink-hadoop-compatibility jar to 
FLINK_HOME/lib. I find a similar discussion 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/hadoopcompatibility-not-in-dist-td12552.html,
 it seems this is the only way to get the hadoop compatibility work?

If this is the case, do I need to copy that jar to every node of the cluster? 

Or, for my super simple requirement above, is there any other way to go? For 
example, without using  flink-hadoop-compatibility?

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.flink.util.Collector;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import com.twitter.chill.protobuf.ProtobufSerializer;

public class Foobar {

        @SuppressWarnings("serial")
        public static void main(String[] args) throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                
env.getConfig().registerTypeWithKryoSerializer(ProtobufObject.class, 
ProtobufSerializer.class);

                String path = "hdfs://...";
                DataSource<Tuple2<NullWritable, BytesWritable>> input = 
env.createInput(HadoopInputs.readHadoopFile(
                                new 
org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat<NullWritable, 
BytesWritable>(),
                                NullWritable.class, BytesWritable.class, path),
                                new 
TupleTypeInfo<>(TypeInformation.of(NullWritable.class), 
TypeInformation.of(BytesWritable.class)));

                FlatMapOperator<Tuple2<NullWritable, BytesWritable>, 
Tuple2<BytesWritable, BytesWritable>> x = input.flatMap(
                                new FlatMapFunction<Tuple2<NullWritable, 
BytesWritable>, Tuple2<BytesWritable, BytesWritable>>() {

                                        @Override
                                        public void 
flatMap(Tuple2<NullWritable, BytesWritable> value,
                                                        
Collector<Tuple2<BytesWritable, BytesWritable>> out) throws Exception {
                                                ProtobufObject info = 
ProtobufObject.parseFrom(value.f1.copyBytes());
                                                String key = info.getKey();
                                                out.collect(new 
Tuple2<BytesWritable, BytesWritable>(new BytesWritable(key.getBytes()),
                                                                new 
BytesWritable(info.toByteArray())));
                                        }
                                });

                Job job = Job.getInstance();
                HadoopOutputFormat<BytesWritable, BytesWritable> hadoopOF = new 
HadoopOutputFormat<BytesWritable, BytesWritable>(
                                new SequenceFileAsBinaryOutputFormat(), job);

                
hadoopOF.getConfiguration().set("mapreduce.output.fileoutputformat.compress", 
"true");
                
hadoopOF.getConfiguration().set("mapreduce.output.fileoutputformat.compress.type",
 CompressionType.BLOCK.toString());
                TextOutputFormat.setOutputPath(job, new Path("hdfs://..."));

                x.output(hadoopOF);
                env.execute("foo");
        }
}

Sent from Mail for Windows 10

Reply via email to