I have this working , so  seeking validation and corrections.
We have SequentialFiles with various CustomWritables in hadoop and we want
to able to work with them from within pig

I have taken PigStorage and the piggybank SequentialFileLoader as a template
 and added pluggable converters that are fed through
the SequentialFileLoader ( which has a default ).
The below is part of the java file.

public class SequenceFileLoader extends FileInputLoadFunc
implementsLoadPushDown{

        public SequenceFileLoader() {

converter = new TextConverter();

 }

  @SuppressWarnings("unchecked")

public SequenceFileLoader(String customWritableToTupleBaseCoverter)
throwsFrontendException{

 try {

converter =
(CustomWritableToTupleBaseConverter)Class.forName(customWritableToTupleBaseCoverter).newInstance();

} catch (Exception e) {

throw new FrontendException(e);

}

}

        @SuppressWarnings("unchecked")

@Override

public Tuple getNext() throws IOException {

if (!mRequiredColumnsInitialized) {

if (signature!=null) {

Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());

mRequiredColumns = (boolean[])ObjectSerializer.deserialize(p.getProperty(
signature));

}

mRequiredColumnsInitialized = true;

}

boolean next = false;

try {

next = reader.nextKeyValue();

} catch (InterruptedException e) {

throw new IOException(e);

}


 if (!next) return null;


 key = reader.getCurrentKey();

value = reader.getCurrentValue();

converter.populateTupleList(key, value,mRequiredColumns,mProtoTuple);

Tuple t =  mTupleFactory.newTuple(mProtoTuple);

mProtoTuple.clear();

return t;

}



and

public abstract class CustomWritableToTupleBaseConverter<K extends Writable,
V extends Writable>{


   public abstract void populateTupleList(K time, V value, boolean[]
mRequiredColumns, ArrayList<Object> mProtoTuple) throws IOException;


}



Features * Allows for a Default Format (TextConverter) ** Text, NullWritable
*** Text is treated as a COMMA(",") separated Text Array **** Consider a
Text with values as 1 , 2 , 3 **** grunt> DEFINE SequenceFileLoader
com.medialets.hadoop.pig.SequenceFileLoader() **** grunt> A = LOAD 'input'
USING SequenceFileLoader **** grunt> B = FOREACH A GENERATE $3 **** grunt> 3
* Allows for custom formats (example TimeWritableTestLongConverter) ** It is
upto the Custom Converter to provide the SequenceFileLoader with the
Writables *** public abstract void populateTupleList(K time, V value,
boolean[] mRequiredColumns, ArrayList<Object> mProtoTuple) throws
IOException; in the base class CustomWritableToTupleBaseConverter. *** The
Custom Converter has to convert it's Key/Value ( as specified by the
SequenceFile ) into a List of Pig recognizable DataTypes **** grunt> DEFINE
SequenceFileLoader a.b.c.SequenceFileLoader('a.b.b.SomeConverter'); ****
grunt> A = LOAD 'input' USING SequenceFileLoader AS (f1:chararray,
f2:chararray, f3:long, f4:chararray, f5:chararray, f6:chararray, f7:double);
**** grunt> B = FILTER A BY f7 + 1 >.5; ** Note that , Pig has to be told as
to what is the type of the column , for it to do the right conversion. In
the above example is f7 is not defined as double, it will try to cast it
into an int , as we adding a 1 to the value. ** Note that the custom
converter is an argument defined in the DEFINE call. * Allows for limiting
the number of columns in the input ** grunt> A = LOAD 'input' USING
SequenceFileLoader AS (f1:chararray, f2:chararray, f3:long, f4:chararray,
f5:chararray, f6:chararray, f7:double);


Any issues any one sees in this approach?

I have chosen the path of least resistance .. so any guidance will be
appreciated.

Reply via email to