I have taken a look but don't see anything that would give me access to the input record's dir location. Could you point me in the right direction?
> On Nov 9, 2016, at 7:50 PM, David Ortiz <[email protected]> wrote: > > I can look up the exact methods in the morning, but in short, the DoFn does > have a way to grab the TaskContext object when running using MapReduce. From > there you can get the split. > > Sent from my Verizon Wireless 4G LTE DROID > On Nov 9, 2016 9:56 PM, Marcin Michalski <[email protected]> wrote: > Hi, is it possible to tie each record from DoFn's process method's to the > single input split from CombinedFileSplit? I basically want to get some info > from the input HDFS directory of the input split (/data/20161109/11) and use > it to enhance a each record that is being read by process method. I was able > to hack the access issue of CrunchInputSplit by using reflection but then I > am not sure how to tie each input record to one input split since my job > reads multiple files from different directories that have date/hour > information that I need. > > > @Override > public void process(GenericData.Record eventRecord, Emitter<Pair<String, > GenericData.Record>> pairEmitter) { > if(getContext() instanceof MapContext) { > InputSplit inputSplit = ((MapContext) > getContext()).getInputSplit(); > Class<? extends InputSplit> splitClass = > inputSplit.getClass(); > > try { > Method getInputSplitMethod = splitClass > .getDeclaredMethod("get"); > getInputSplitMethod.setAccessible(true); > CombineFileSplit fileSplit = (CombineFileSplit) > getInputSplitMethod.invoke(inputSplit); > > System.out.println("number of input files: " + > fileSplit.getPaths().length); > int index = 0; > for(Path p: fileSplit.getPaths()) { > System.out.println("split length: " + > fileSplit.getLength(index) + " partition: " > + getPartitionDt(fileSplit.getPath(index))); > index ++; > } > } catch (Exception e) { > System.out.println("we have a problem"); > e.printStackTrace(); > } > } > } > > ...now I want to output a pair of of Partition info YYYYMMDDHH and some > modified avro record. Any idea how I can get the directory information of the > inputsplit that is being processed by each call of the process method? > ... > emit(Pair.of(partition, some_avro_record))); > > I know that I could disable the combined input file format but I don't want > to do that > > Thanks! > -- > Marcin Michalski | Big Data Engineer > [email protected] | (917) 478-9422 (c) > > Tagged, Inc. is now if(we). Learn more at ifwe.co > This email is intended only for the use of the individual(s) to whom it is > addressed. If you have received this communication in error, please > immediately notify the sender and delete the original email. > > Disclaimer > > The information contained in this communication from the sender is > confidential. It is intended solely for use by the recipient and others > authorized to receive it. If you are not the recipient, you are hereby > notified that any disclosure, copying, distribution or taking action in > relation of the contents of this information is strictly prohibited and may > be unlawful. > > This email has been scanned for viruses and malware, and may have been > automatically archived by Mimecast Ltd, an innovator in Software as a Service > (SaaS) for business. Providing a safer and more useful place for your human > generated data. Specializing in; Security, archiving and compliance. To find > out more Click Here.
