All said and done , does this smell a hack.. or is it acceptable , for my use case , where I only am interested , in making my Sequential Files and it's contents use Pig , to it's fullest?
On Mon, May 24, 2010 at 3:59 PM, Vishal Santoshi <[email protected]>wrote: > Sorry Dmitry. > > Let me explain our issue more lucidly. We have most of our MR jobs use raw > hadoop ( java impl ) and create SequentialFiles with varying Custom > Writables. > PigStorage is limited to TextFormat and there is an implementation in > piggybank for SequentialFile Loading, which it seems is limited , in the > sense that > it > > * does not provide for Custom Formats ( like a TextPair or a Score that may > use basic Writables like Text,DoubleWritable etc ) > * does not provide for type/name mapping ( the "AS" clause ) > * does not provide for limiting the inputs u may be interested in. > > I want to use a Loader to provide for something like this > > LOAD 'input' USING SequenceFileLoader AS (f1:chararray,2:chararray, > f3:long, f4:chararray, f5:chararray, f6:chararray,f7:double); > > Now this is well and good and easy to write , if we have some standard > (Text,NullWritable ), Sequential File , with the Text having "," separated > columns ( almost a Pig Storage , but feeding off a Sequential File ). > > > In cases though , where we have a Sequential File ( > CustomWritableKey, CustomWritableValue ) where we still would like to > extract the raw types and aggregate on, the above fails , as the chararray, > int etc are limited to known types ( and I may be wrong here ). > > What there fore I tried was to reduce the CustomWritables to their raw > types , using a injectable Converter. This converter, takes the > CustomWritable ( key and value of a SequentialFile ) and returns the > ArrayList<Object> that are the CustomWritable's reduced to their base types > and use the List returned to create the Tuple that has to be returned from > getNext(). > > I think this code is more likely to tell the tale better. > > http://pastebin.com/QEwMztjU > > > > > > > > > > > > On Mon, May 24, 2010 at 3:32 PM, Dmitriy Ryaboy <[email protected]>wrote: > >> Vishal, >> I am not sure what your question is. Could you describe your goals and >> challenges before pasting in the implementation? It looks like the bottom >> part of your email, with all the comments, got malformatted, which may be >> the source of my confusion. >> >> Also, various services like pastebin and gist work better for code >> sharing, >> as they can take care of highlighting and things of that nature, which is >> handy for reviews. >> >> Thanks >> -Dmitriy >> >> On Mon, May 24, 2010 at 9:41 AM, Vishal Santoshi >> <[email protected]>wrote: >> >> > I have this working , so seeking validation and corrections. >> > We have SequentialFiles with various CustomWritables in hadoop and we >> want >> > to able to work with them from within pig >> > >> > I have taken PigStorage and the piggybank SequentialFileLoader as a >> > template >> > and added pluggable converters that are fed through >> > the SequentialFileLoader ( which has a default ). >> > The below is part of the java file. >> > >> > public class SequenceFileLoader extends FileInputLoadFunc >> > implementsLoadPushDown{ >> > >> > public SequenceFileLoader() { >> > >> > converter = new TextConverter(); >> > >> > } >> > >> > @SuppressWarnings("unchecked") >> > >> > public SequenceFileLoader(String customWritableToTupleBaseCoverter) >> > throwsFrontendException{ >> > >> > try { >> > >> > converter = >> > >> > >> (CustomWritableToTupleBaseConverter)Class.forName(customWritableToTupleBaseCoverter).newInstance(); >> > >> > } catch (Exception e) { >> > >> > throw new FrontendException(e); >> > >> > } >> > >> > } >> > >> > @SuppressWarnings("unchecked") >> > >> > @Override >> > >> > public Tuple getNext() throws IOException { >> > >> > if (!mRequiredColumnsInitialized) { >> > >> > if (signature!=null) { >> > >> > Properties p = >> > UDFContext.getUDFContext().getUDFProperties(this.getClass()); >> > >> > mRequiredColumns = >> (boolean[])ObjectSerializer.deserialize(p.getProperty( >> > signature)); >> > >> > } >> > >> > mRequiredColumnsInitialized = true; >> > >> > } >> > >> > boolean next = false; >> > >> > try { >> > >> > next = reader.nextKeyValue(); >> > >> > } catch (InterruptedException e) { >> > >> > throw new IOException(e); >> > >> > } >> > >> > >> > if (!next) return null; >> > >> > >> > key = reader.getCurrentKey(); >> > >> > value = reader.getCurrentValue(); >> > >> > converter.populateTupleList(key, value,mRequiredColumns,mProtoTuple); >> > >> > Tuple t = mTupleFactory.newTuple(mProtoTuple); >> > >> > mProtoTuple.clear(); >> > >> > return t; >> > >> > } >> > >> > >> > >> > and >> > >> > public abstract class CustomWritableToTupleBaseConverter<K extends >> > Writable, >> > V extends Writable>{ >> > >> > >> > public abstract void populateTupleList(K time, V value, boolean[] >> > mRequiredColumns, ArrayList<Object> mProtoTuple) throws IOException; >> > >> > >> > } >> > >> > >> > >> > Features * Allows for a Default Format (TextConverter) ** Text, >> > NullWritable >> > *** Text is treated as a COMMA(",") separated Text Array **** Consider a >> > Text with values as 1 , 2 , 3 **** grunt> DEFINE SequenceFileLoader >> > com.medialets.hadoop.pig.SequenceFileLoader() **** grunt> A = LOAD >> 'input' >> > USING SequenceFileLoader **** grunt> B = FOREACH A GENERATE $3 **** >> grunt> >> > 3 >> > * Allows for custom formats (example TimeWritableTestLongConverter) ** >> It >> > is >> > upto the Custom Converter to provide the SequenceFileLoader with the >> > Writables *** public abstract void populateTupleList(K time, V value, >> > boolean[] mRequiredColumns, ArrayList<Object> mProtoTuple) throws >> > IOException; in the base class CustomWritableToTupleBaseConverter. *** >> The >> > Custom Converter has to convert it's Key/Value ( as specified by the >> > SequenceFile ) into a List of Pig recognizable DataTypes **** grunt> >> DEFINE >> > SequenceFileLoader a.b.c.SequenceFileLoader('a.b.b.SomeConverter'); **** >> > grunt> A = LOAD 'input' USING SequenceFileLoader AS (f1:chararray, >> > f2:chararray, f3:long, f4:chararray, f5:chararray, f6:chararray, >> > f7:double); >> > **** grunt> B = FILTER A BY f7 + 1 >.5; ** Note that , Pig has to be >> told >> > as >> > to what is the type of the column , for it to do the right conversion. >> In >> > the above example is f7 is not defined as double, it will try to cast it >> > into an int , as we adding a 1 to the value. ** Note that the custom >> > converter is an argument defined in the DEFINE call. * Allows for >> limiting >> > the number of columns in the input ** grunt> A = LOAD 'input' USING >> > SequenceFileLoader AS (f1:chararray, f2:chararray, f3:long, >> f4:chararray, >> > f5:chararray, f6:chararray, f7:double); >> > >> > >> > Any issues any one sees in this approach? >> > >> > I have chosen the path of least resistance .. so any guidance will be >> > appreciated. >> > >> > >
