I have this working , so seeking validation and corrections.
We have SequentialFiles with various CustomWritables in hadoop and we want
to able to work with them from within pig
I have taken PigStorage and the piggybank SequentialFileLoader as a template
and added pluggable converters that are fed through
the SequentialFileLoader ( which has a default ).
The below is part of the java file.
public class SequenceFileLoader extends FileInputLoadFunc
implementsLoadPushDown{
public SequenceFileLoader() {
converter = new TextConverter();
}
@SuppressWarnings("unchecked")
public SequenceFileLoader(String customWritableToTupleBaseCoverter)
throwsFrontendException{
try {
converter =
(CustomWritableToTupleBaseConverter)Class.forName(customWritableToTupleBaseCoverter).newInstance();
} catch (Exception e) {
throw new FrontendException(e);
}
}
@SuppressWarnings("unchecked")
@Override
public Tuple getNext() throws IOException {
if (!mRequiredColumnsInitialized) {
if (signature!=null) {
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
mRequiredColumns = (boolean[])ObjectSerializer.deserialize(p.getProperty(
signature));
}
mRequiredColumnsInitialized = true;
}
boolean next = false;
try {
next = reader.nextKeyValue();
} catch (InterruptedException e) {
throw new IOException(e);
}
if (!next) return null;
key = reader.getCurrentKey();
value = reader.getCurrentValue();
converter.populateTupleList(key, value,mRequiredColumns,mProtoTuple);
Tuple t = mTupleFactory.newTuple(mProtoTuple);
mProtoTuple.clear();
return t;
}
and
public abstract class CustomWritableToTupleBaseConverter<K extends Writable,
V extends Writable>{
public abstract void populateTupleList(K time, V value, boolean[]
mRequiredColumns, ArrayList<Object> mProtoTuple) throws IOException;
}
Features * Allows for a Default Format (TextConverter) ** Text, NullWritable
*** Text is treated as a COMMA(",") separated Text Array **** Consider a
Text with values as 1 , 2 , 3 **** grunt> DEFINE SequenceFileLoader
com.medialets.hadoop.pig.SequenceFileLoader() **** grunt> A = LOAD 'input'
USING SequenceFileLoader **** grunt> B = FOREACH A GENERATE $3 **** grunt> 3
* Allows for custom formats (example TimeWritableTestLongConverter) ** It is
upto the Custom Converter to provide the SequenceFileLoader with the
Writables *** public abstract void populateTupleList(K time, V value,
boolean[] mRequiredColumns, ArrayList<Object> mProtoTuple) throws
IOException; in the base class CustomWritableToTupleBaseConverter. *** The
Custom Converter has to convert it's Key/Value ( as specified by the
SequenceFile ) into a List of Pig recognizable DataTypes **** grunt> DEFINE
SequenceFileLoader a.b.c.SequenceFileLoader('a.b.b.SomeConverter'); ****
grunt> A = LOAD 'input' USING SequenceFileLoader AS (f1:chararray,
f2:chararray, f3:long, f4:chararray, f5:chararray, f6:chararray, f7:double);
**** grunt> B = FILTER A BY f7 + 1 >.5; ** Note that , Pig has to be told as
to what is the type of the column , for it to do the right conversion. In
the above example is f7 is not defined as double, it will try to cast it
into an int , as we adding a 1 to the value. ** Note that the custom
converter is an argument defined in the DEFINE call. * Allows for limiting
the number of columns in the input ** grunt> A = LOAD 'input' USING
SequenceFileLoader AS (f1:chararray, f2:chararray, f3:long, f4:chararray,
f5:chararray, f6:chararray, f7:double);
Any issues any one sees in this approach?
I have chosen the path of least resistance .. so any guidance will be
appreciated.