Sorry Dmitry.

Let me explain our issue more lucidly. We have most of our MR jobs use raw
hadoop ( java impl ) and create SequentialFiles with varying Custom
Writables.
PigStorage is limited to TextFormat and there is an implementation in
piggybank for SequentialFile Loading, which it seems is limited , in the
sense that
it

* does not provide for Custom Formats ( like a TextPair or a Score that may
use basic Writables like Text,DoubleWritable etc )
* does not provide for type/name mapping ( the "AS" clause )
* does not provide for limiting the  inputs u may be interested in.

I want to use a Loader to provide for something like this

LOAD 'input' USING SequenceFileLoader AS (f1:chararray,2:chararray, f3:long,
f4:chararray, f5:chararray, f6:chararray,f7:double);

Now this is well and good and easy to write , if we have some standard
(Text,NullWritable ), Sequential File , with the Text having "," separated
columns ( almost a Pig Storage , but feeding off a Sequential File ).


In cases though , where we have a  Sequential File (
CustomWritableKey, CustomWritableValue ) where we still would like to
extract the raw types and aggregate on, the above fails , as the chararray,
int etc are limited to known types ( and I may be wrong here ).

What there fore I tried was to reduce the  CustomWritables to their raw
types , using a injectable Converter. This converter, takes the
CustomWritable ( key and value of a SequentialFile )  and returns the
 ArrayList<Object> that are the CustomWritable's reduced to their base types
and use the List returned to create the Tuple that has to be returned from
getNext().

I think this code is more likely to tell the tale better.

http://pastebin.com/QEwMztjU











On Mon, May 24, 2010 at 3:32 PM, Dmitriy Ryaboy <[email protected]> wrote:

> Vishal,
> I am not sure what your question is. Could you describe your goals and
> challenges before pasting in the implementation? It looks like the bottom
> part of your email, with all the comments, got malformatted, which may be
> the source of my confusion.
>
> Also, various services like pastebin and gist work better for code sharing,
> as they can take care of highlighting and things of that nature, which is
> handy for reviews.
>
> Thanks
> -Dmitriy
>
> On Mon, May 24, 2010 at 9:41 AM, Vishal Santoshi
> <[email protected]>wrote:
>
> > I have this working , so  seeking validation and corrections.
> > We have SequentialFiles with various CustomWritables in hadoop and we
> want
> > to able to work with them from within pig
> >
> > I have taken PigStorage and the piggybank SequentialFileLoader as a
> > template
> >  and added pluggable converters that are fed through
> > the SequentialFileLoader ( which has a default ).
> > The below is part of the java file.
> >
> > public class SequenceFileLoader extends FileInputLoadFunc
> > implementsLoadPushDown{
> >
> >        public SequenceFileLoader() {
> >
> > converter = new TextConverter();
> >
> >  }
> >
> >  @SuppressWarnings("unchecked")
> >
> > public SequenceFileLoader(String customWritableToTupleBaseCoverter)
> > throwsFrontendException{
> >
> >  try {
> >
> > converter =
> >
> >
> (CustomWritableToTupleBaseConverter)Class.forName(customWritableToTupleBaseCoverter).newInstance();
> >
> > } catch (Exception e) {
> >
> > throw new FrontendException(e);
> >
> > }
> >
> > }
> >
> >        @SuppressWarnings("unchecked")
> >
> > @Override
> >
> > public Tuple getNext() throws IOException {
> >
> > if (!mRequiredColumnsInitialized) {
> >
> > if (signature!=null) {
> >
> > Properties p =
> > UDFContext.getUDFContext().getUDFProperties(this.getClass());
> >
> > mRequiredColumns = (boolean[])ObjectSerializer.deserialize(p.getProperty(
> > signature));
> >
> > }
> >
> > mRequiredColumnsInitialized = true;
> >
> > }
> >
> > boolean next = false;
> >
> > try {
> >
> > next = reader.nextKeyValue();
> >
> > } catch (InterruptedException e) {
> >
> > throw new IOException(e);
> >
> > }
> >
> >
> >  if (!next) return null;
> >
> >
> >  key = reader.getCurrentKey();
> >
> > value = reader.getCurrentValue();
> >
> > converter.populateTupleList(key, value,mRequiredColumns,mProtoTuple);
> >
> > Tuple t =  mTupleFactory.newTuple(mProtoTuple);
> >
> > mProtoTuple.clear();
> >
> > return t;
> >
> > }
> >
> >
> >
> > and
> >
> > public abstract class CustomWritableToTupleBaseConverter<K extends
> > Writable,
> > V extends Writable>{
> >
> >
> >   public abstract void populateTupleList(K time, V value, boolean[]
> > mRequiredColumns, ArrayList<Object> mProtoTuple) throws IOException;
> >
> >
> > }
> >
> >
> >
> > Features * Allows for a Default Format (TextConverter) ** Text,
> > NullWritable
> > *** Text is treated as a COMMA(",") separated Text Array **** Consider a
> > Text with values as 1 , 2 , 3 **** grunt> DEFINE SequenceFileLoader
> > com.medialets.hadoop.pig.SequenceFileLoader() **** grunt> A = LOAD
> 'input'
> > USING SequenceFileLoader **** grunt> B = FOREACH A GENERATE $3 ****
> grunt>
> > 3
> > * Allows for custom formats (example TimeWritableTestLongConverter) ** It
> > is
> > upto the Custom Converter to provide the SequenceFileLoader with the
> > Writables *** public abstract void populateTupleList(K time, V value,
> > boolean[] mRequiredColumns, ArrayList<Object> mProtoTuple) throws
> > IOException; in the base class CustomWritableToTupleBaseConverter. ***
> The
> > Custom Converter has to convert it's Key/Value ( as specified by the
> > SequenceFile ) into a List of Pig recognizable DataTypes **** grunt>
> DEFINE
> > SequenceFileLoader a.b.c.SequenceFileLoader('a.b.b.SomeConverter'); ****
> > grunt> A = LOAD 'input' USING SequenceFileLoader AS (f1:chararray,
> > f2:chararray, f3:long, f4:chararray, f5:chararray, f6:chararray,
> > f7:double);
> > **** grunt> B = FILTER A BY f7 + 1 >.5; ** Note that , Pig has to be told
> > as
> > to what is the type of the column , for it to do the right conversion. In
> > the above example is f7 is not defined as double, it will try to cast it
> > into an int , as we adding a 1 to the value. ** Note that the custom
> > converter is an argument defined in the DEFINE call. * Allows for
> limiting
> > the number of columns in the input ** grunt> A = LOAD 'input' USING
> > SequenceFileLoader AS (f1:chararray, f2:chararray, f3:long, f4:chararray,
> > f5:chararray, f6:chararray, f7:double);
> >
> >
> > Any issues any one sees in this approach?
> >
> > I have chosen the path of least resistance .. so any guidance will be
> > appreciated.
> >
>

Reply via email to