Hi, is it possible to tie each record from DoFn's process method's to the
single input split from CombinedFileSplit? I basically want to get some
info from the input HDFS directory of the input split (/data/*20161109/11*)
and use it to enhance a each record that is being read by process method. I
was able to hack the access issue of CrunchInputSplit by using reflection
but then I am not sure how to tie each input record to one input split
since my job reads multiple files from different directories that have
date/hour information that I need.
@Override
public void process(GenericData.Record eventRecord, Emitter<Pair<String,
GenericData.Record>> pairEmitter) {
if(getContext() instanceof MapContext) {
InputSplit inputSplit = ((MapContext)
getContext()).getInputSplit();
Class<? extends InputSplit> splitClass =
inputSplit.getClass();
try {
Method getInputSplitMethod = splitClass
.getDeclaredMethod("get");
getInputSplitMethod.setAccessible(true);
CombineFileSplit fileSplit = (CombineFileSplit)
getInputSplitMethod.invoke(inputSplit);
System.out.println("number of input files: " +
fileSplit.getPaths().length);
int index = 0;
for(Path p: fileSplit.getPaths()) {
System.out.println("split length: " +
fileSplit.getLength(index) + " partition: "
+ getPartitionDt(fileSplit.getPath(index)));
index ++;
}
} catch (Exception e) {
System.out.println("we have a problem");
e.printStackTrace();
}
}
}
...now I want to output a pair of of Partition info YYYYMMDDHH and some
modified avro record. Any idea how I can get the directory information of
the inputsplit that is being processed by each call of the process method?
...
emit(Pair.of(partition, some_avro_record)));
I know that I could disable the combined input file format but I don't want
to do that
Thanks!
--
Marcin Michalski | Big Data Engineer
[email protected] <[email protected]> | (917) 478-9422 (c)
<http://www.ifwe.co/>
Tagged, Inc. is now if(we). Learn more at ifwe.co