Max Thomas wrote:
Suppose you have a workflow like the following (hopefully not too
uncommon):
* "main table" - billions of rows, each with order of magnitude 100 columns
* Ternary classifier that produces an annotation on each row in the main
table. Suppose the labels are A, B, and C. Additionally, this analytic
adds all labels to an index table, which is of the form:
label : rowId
Can you clarify what you mean by "annotation on each row in the main
table"? Another column in the row?
to facilitate lookups of a particular type.
Now, suppose you want to run another analytic over all rows with label
A, preferably using MapReduce. It seems the options are:
1. Create a scanner which retrieves all As from the index table; add
these row IDs to an AccumuloInputFormat job; launch a MapReduce job with
a single map phase. Con: driver program will need a large amount of
memory to hold all rows for the range list.
A single node/process doing this all may be a point of contention
depending on the number of Accumulo nodes in the system. You'd also need
to be tricky about batching things and making sure you don't run out of
memory like you outlined.
2. A MapReduce job over the index table, with a Reduce phase where each
reducer has a collection of row IDs to iterate over. Each reducer then
retrieves its assigned rows and runs over them.
This should work and is likely the easiest way to run this type of
algorithm over multiple nodes. IMO, this is getting close to the point
of using MapReduce as a hammer to drive a screw. It will work, but it's
not an ideal solution.
3. Run over the entire main table with a naive filter to check
classification type. Cons: hits every row, many of which aren't going to
match.
4. AccumuloMultiTableFormat, Filters/Iterators - don't seem appropriate
here
It seems option #2 is ideal, with option #1 possibly working out too.
But, I want to make sure I'm not missing something, as it doesn't seem
possible to set up a workflow where the index table is hit, row IDs are
retrieved, and these are then passed to another MapReduce job capable of
hitting a different table via MapReduce (obviously one could create a
BatchScanner given the inputs anywhere). Are there any examples that
cover this? Or does anyone have a few suggestions about how to set up
such a workflow?
I like to think of these in terms of producer/consumer problems.
0) Take label, start a query over index table
1) Produce row IDs from your index table
2) For each consumed row ID, run a query against the main table
3) Collect the rows from the main table query (transform/massage them
for presentation)
Steps 1 and 2 can be disjoint from one another. As you get more row IDs,
you shoot them over to some other process that takes them and runs a
query against the main table.
Trivially, you could do this via a thread pool, but this will still be
limited to a single node's capabilities (which may or may not be
sufficient). Scaling beyond that, something like a custom YARN
application for scaling and some messaging system (maybe Kafka?) could
act as the transport between 1 and 2. It's a lot more work to write than
a MapReduce job would be, but you have a lot more control over things
and it's bit more elegant IMO.
Another answer might very well be: this is a wacky table/indexing setup,
which I am very amenable to hearing. But to a naive Accumulo user,
having an index table seems OK - I think it is also covered in the manual.
Index tables are a very good idea. You should use them :). Accumulo only
provides a single sort order. Often, this isn't sufficient for the types
of queries you want to support over your data. A secondary index is an
easy way to achieve this that works out well given the availability of
lots of space on your Hadoop cluster.