Im currently writing a Pig Latin programm:
A = load 'hbase://mytable1' my.packages.CustomizeHBaseStorage('VALUE',
'-loadKey true', 'myrowkey1') as (rowkey:chararray, columncontent:map[]);
ABag = foreach PATTERN_0 generate flatten(my.packages.MapToBag($1)) as
(output:chararray);
the CustimizeHbaseStorage is loading the row "myrowkey1" and after that the
map for this rowkey is transformed to a Bag. That works fine so far.
So, in the ABag are now some entries. With this entries I try to do load
new row keys (every entry in the bag is the information for a new rowkey I
have to load next). So I tried something like this:
X= FOREACH ABag {
TMP = load 'hbase://mytable2'
my.packages.customizeHBaseStorage('VALUE', '-loadKey true', '$0') as
(rowkey:chararray, columncontent:map[]);
GENERATE (TMP.$0);
}
This doesn't does not work, because as far as I now the load statement is
not allowed for FOREACH.
So I tried to build my own EvalFunc:
X = FOREACH INTERMEDIATE_BAG_0 GENERATE my.packages.MyNewUDF($0);
Here is the Java Code for the MyNewUDF:
...
public DataBag exec(Tuple input) throws IOException {
DataBag result = null;
try {
result = bagFactory.newDefaultBag();
CustomizeHBaseStorage loader = new CustomizeHBaseStorage("VALUE",
"-loadKey true", input
.get(0).toString());
loader.getInputFormat();
Tuple curTuple = loader.getNext();
while (curTuple != null) {
result.add(curTuple);
curTuple = loader.getNext();
}
} catch (ParseException e) {
e.printStackTrace();
}
return result;
}
...
I think this would work, but the problem is I got a NullpointerException
because the RecordReader in the HBaseStorage is not initialized when
executing getNext(). So if anybody can say me how I can initialized the
RecordReader (and I think the PigSplit too, because its necessary for
CustomizeHBaseStorage .prepareToRead(RecordReader reader, PigSplit split))
or maybe another approach I would be thankful.
BTW. I know that I can load the whole mytable2 in a new alias and then JOIN
ABag and the new alias, but I try to optimize my program, beacuse it is not
necessary to load the whole mytable2. I try to build a "join" with
information passing.
Thanks