Sorry Dmitry. Let me explain our issue more lucidly. We have most of our MR jobs use raw hadoop ( java impl ) and create SequentialFiles with varying Custom Writables. PigStorage is limited to TextFormat and there is an implementation in piggybank for SequentialFile Loading, which it seems is limited , in the sense that it
* does not provide for Custom Formats ( like a TextPair or a Score that may use basic Writables like Text,DoubleWritable etc ) * does not provide for type/name mapping ( the "AS" clause ) * does not provide for limiting the inputs u may be interested in. I want to use a Loader to provide for something like this LOAD 'input' USING SequenceFileLoader AS (f1:chararray,2:chararray, f3:long, f4:chararray, f5:chararray, f6:chararray,f7:double); Now this is well and good and easy to write , if we have some standard (Text,NullWritable ), Sequential File , with the Text having "," separated columns ( almost a Pig Storage , but feeding off a Sequential File ). In cases though , where we have a Sequential File ( CustomWritableKey, CustomWritableValue ) where we still would like to extract the raw types and aggregate on, the above fails , as the chararray, int etc are limited to known types ( and I may be wrong here ). What there fore I tried was to reduce the CustomWritables to their raw types , using a injectable Converter. This converter, takes the CustomWritable ( key and value of a SequentialFile ) and returns the ArrayList<Object> that are the CustomWritable's reduced to their base types and use the List returned to create the Tuple that has to be returned from getNext(). I think this code is more likely to tell the tale better. http://pastebin.com/QEwMztjU On Mon, May 24, 2010 at 3:32 PM, Dmitriy Ryaboy <[email protected]> wrote: > Vishal, > I am not sure what your question is. Could you describe your goals and > challenges before pasting in the implementation? It looks like the bottom > part of your email, with all the comments, got malformatted, which may be > the source of my confusion. > > Also, various services like pastebin and gist work better for code sharing, > as they can take care of highlighting and things of that nature, which is > handy for reviews. > > Thanks > -Dmitriy > > On Mon, May 24, 2010 at 9:41 AM, Vishal Santoshi > <[email protected]>wrote: > > > I have this working , so seeking validation and corrections. > > We have SequentialFiles with various CustomWritables in hadoop and we > want > > to able to work with them from within pig > > > > I have taken PigStorage and the piggybank SequentialFileLoader as a > > template > > and added pluggable converters that are fed through > > the SequentialFileLoader ( which has a default ). > > The below is part of the java file. > > > > public class SequenceFileLoader extends FileInputLoadFunc > > implementsLoadPushDown{ > > > > public SequenceFileLoader() { > > > > converter = new TextConverter(); > > > > } > > > > @SuppressWarnings("unchecked") > > > > public SequenceFileLoader(String customWritableToTupleBaseCoverter) > > throwsFrontendException{ > > > > try { > > > > converter = > > > > > (CustomWritableToTupleBaseConverter)Class.forName(customWritableToTupleBaseCoverter).newInstance(); > > > > } catch (Exception e) { > > > > throw new FrontendException(e); > > > > } > > > > } > > > > @SuppressWarnings("unchecked") > > > > @Override > > > > public Tuple getNext() throws IOException { > > > > if (!mRequiredColumnsInitialized) { > > > > if (signature!=null) { > > > > Properties p = > > UDFContext.getUDFContext().getUDFProperties(this.getClass()); > > > > mRequiredColumns = (boolean[])ObjectSerializer.deserialize(p.getProperty( > > signature)); > > > > } > > > > mRequiredColumnsInitialized = true; > > > > } > > > > boolean next = false; > > > > try { > > > > next = reader.nextKeyValue(); > > > > } catch (InterruptedException e) { > > > > throw new IOException(e); > > > > } > > > > > > if (!next) return null; > > > > > > key = reader.getCurrentKey(); > > > > value = reader.getCurrentValue(); > > > > converter.populateTupleList(key, value,mRequiredColumns,mProtoTuple); > > > > Tuple t = mTupleFactory.newTuple(mProtoTuple); > > > > mProtoTuple.clear(); > > > > return t; > > > > } > > > > > > > > and > > > > public abstract class CustomWritableToTupleBaseConverter<K extends > > Writable, > > V extends Writable>{ > > > > > > public abstract void populateTupleList(K time, V value, boolean[] > > mRequiredColumns, ArrayList<Object> mProtoTuple) throws IOException; > > > > > > } > > > > > > > > Features * Allows for a Default Format (TextConverter) ** Text, > > NullWritable > > *** Text is treated as a COMMA(",") separated Text Array **** Consider a > > Text with values as 1 , 2 , 3 **** grunt> DEFINE SequenceFileLoader > > com.medialets.hadoop.pig.SequenceFileLoader() **** grunt> A = LOAD > 'input' > > USING SequenceFileLoader **** grunt> B = FOREACH A GENERATE $3 **** > grunt> > > 3 > > * Allows for custom formats (example TimeWritableTestLongConverter) ** It > > is > > upto the Custom Converter to provide the SequenceFileLoader with the > > Writables *** public abstract void populateTupleList(K time, V value, > > boolean[] mRequiredColumns, ArrayList<Object> mProtoTuple) throws > > IOException; in the base class CustomWritableToTupleBaseConverter. *** > The > > Custom Converter has to convert it's Key/Value ( as specified by the > > SequenceFile ) into a List of Pig recognizable DataTypes **** grunt> > DEFINE > > SequenceFileLoader a.b.c.SequenceFileLoader('a.b.b.SomeConverter'); **** > > grunt> A = LOAD 'input' USING SequenceFileLoader AS (f1:chararray, > > f2:chararray, f3:long, f4:chararray, f5:chararray, f6:chararray, > > f7:double); > > **** grunt> B = FILTER A BY f7 + 1 >.5; ** Note that , Pig has to be told > > as > > to what is the type of the column , for it to do the right conversion. In > > the above example is f7 is not defined as double, it will try to cast it > > into an int , as we adding a 1 to the value. ** Note that the custom > > converter is an argument defined in the DEFINE call. * Allows for > limiting > > the number of columns in the input ** grunt> A = LOAD 'input' USING > > SequenceFileLoader AS (f1:chararray, f2:chararray, f3:long, f4:chararray, > > f5:chararray, f6:chararray, f7:double); > > > > > > Any issues any one sees in this approach? > > > > I have chosen the path of least resistance .. so any guidance will be > > appreciated. > > >
