Hi, is it possible to tie each record from DoFn's process method's to the
single input split from CombinedFileSplit? I basically want to get some
info from the input HDFS directory of the input split (/data/*20161109/11*)
and use it to enhance a each record that is being read by process method. I
was able to hack the access issue of CrunchInputSplit by using reflection
but then I am not sure how to tie each input record to one input split
since my job reads multiple files from different directories that have
date/hour information that I need.


@Override
 public void process(GenericData.Record eventRecord, Emitter<Pair<String,
GenericData.Record>> pairEmitter) {
    if(getContext() instanceof MapContext) {
                InputSplit inputSplit = ((MapContext)
getContext()).getInputSplit();
                Class<? extends InputSplit> splitClass =
inputSplit.getClass();

                try {
                    Method getInputSplitMethod = splitClass
                            .getDeclaredMethod("get");
                    getInputSplitMethod.setAccessible(true);
                    CombineFileSplit fileSplit = (CombineFileSplit)
getInputSplitMethod.invoke(inputSplit);

                    System.out.println("number of input files: " +
fileSplit.getPaths().length);
                    int index = 0;
                    for(Path p: fileSplit.getPaths()) {
                        System.out.println("split length: " +
fileSplit.getLength(index) + " partition: "
                                + getPartitionDt(fileSplit.getPath(index)));
                        index ++;
                    }
                } catch (Exception e) {
                    System.out.println("we have a problem");
                    e.printStackTrace();
                }
            }
        }

...now I want to output a pair of of Partition info YYYYMMDDHH and some
modified avro record. Any idea how I can get the directory information of
the inputsplit that is being processed by each call of the process method?
...
emit(Pair.of(partition, some_avro_record)));

I know that I could disable the combined input file format but I don't want
to do that

Thanks!
-- 
Marcin Michalski | Big Data Engineer
[email protected] <[email protected]> | (917) 478-9422 (c)
<http://www.ifwe.co/>
Tagged, Inc. is now if(we). Learn more at ifwe.co

Reply via email to