Vishal, I am not sure what your question is. Could you describe your goals and challenges before pasting in the implementation? It looks like the bottom part of your email, with all the comments, got malformatted, which may be the source of my confusion.
Also, various services like pastebin and gist work better for code sharing, as they can take care of highlighting and things of that nature, which is handy for reviews. Thanks -Dmitriy On Mon, May 24, 2010 at 9:41 AM, Vishal Santoshi <[email protected]>wrote: > I have this working , so seeking validation and corrections. > We have SequentialFiles with various CustomWritables in hadoop and we want > to able to work with them from within pig > > I have taken PigStorage and the piggybank SequentialFileLoader as a > template > and added pluggable converters that are fed through > the SequentialFileLoader ( which has a default ). > The below is part of the java file. > > public class SequenceFileLoader extends FileInputLoadFunc > implementsLoadPushDown{ > > public SequenceFileLoader() { > > converter = new TextConverter(); > > } > > @SuppressWarnings("unchecked") > > public SequenceFileLoader(String customWritableToTupleBaseCoverter) > throwsFrontendException{ > > try { > > converter = > > (CustomWritableToTupleBaseConverter)Class.forName(customWritableToTupleBaseCoverter).newInstance(); > > } catch (Exception e) { > > throw new FrontendException(e); > > } > > } > > @SuppressWarnings("unchecked") > > @Override > > public Tuple getNext() throws IOException { > > if (!mRequiredColumnsInitialized) { > > if (signature!=null) { > > Properties p = > UDFContext.getUDFContext().getUDFProperties(this.getClass()); > > mRequiredColumns = (boolean[])ObjectSerializer.deserialize(p.getProperty( > signature)); > > } > > mRequiredColumnsInitialized = true; > > } > > boolean next = false; > > try { > > next = reader.nextKeyValue(); > > } catch (InterruptedException e) { > > throw new IOException(e); > > } > > > if (!next) return null; > > > key = reader.getCurrentKey(); > > value = reader.getCurrentValue(); > > converter.populateTupleList(key, value,mRequiredColumns,mProtoTuple); > > Tuple t = mTupleFactory.newTuple(mProtoTuple); > > mProtoTuple.clear(); > > return t; > > } > > > > and > > public abstract class CustomWritableToTupleBaseConverter<K extends > Writable, > V extends Writable>{ > > > public abstract void populateTupleList(K time, V value, boolean[] > mRequiredColumns, ArrayList<Object> mProtoTuple) throws IOException; > > > } > > > > Features * Allows for a Default Format (TextConverter) ** Text, > NullWritable > *** Text is treated as a COMMA(",") separated Text Array **** Consider a > Text with values as 1 , 2 , 3 **** grunt> DEFINE SequenceFileLoader > com.medialets.hadoop.pig.SequenceFileLoader() **** grunt> A = LOAD 'input' > USING SequenceFileLoader **** grunt> B = FOREACH A GENERATE $3 **** grunt> > 3 > * Allows for custom formats (example TimeWritableTestLongConverter) ** It > is > upto the Custom Converter to provide the SequenceFileLoader with the > Writables *** public abstract void populateTupleList(K time, V value, > boolean[] mRequiredColumns, ArrayList<Object> mProtoTuple) throws > IOException; in the base class CustomWritableToTupleBaseConverter. *** The > Custom Converter has to convert it's Key/Value ( as specified by the > SequenceFile ) into a List of Pig recognizable DataTypes **** grunt> DEFINE > SequenceFileLoader a.b.c.SequenceFileLoader('a.b.b.SomeConverter'); **** > grunt> A = LOAD 'input' USING SequenceFileLoader AS (f1:chararray, > f2:chararray, f3:long, f4:chararray, f5:chararray, f6:chararray, > f7:double); > **** grunt> B = FILTER A BY f7 + 1 >.5; ** Note that , Pig has to be told > as > to what is the type of the column , for it to do the right conversion. In > the above example is f7 is not defined as double, it will try to cast it > into an int , as we adding a 1 to the value. ** Note that the custom > converter is an argument defined in the DEFINE call. * Allows for limiting > the number of columns in the input ** grunt> A = LOAD 'input' USING > SequenceFileLoader AS (f1:chararray, f2:chararray, f3:long, f4:chararray, > f5:chararray, f6:chararray, f7:double); > > > Any issues any one sees in this approach? > > I have chosen the path of least resistance .. so any guidance will be > appreciated. >
