Wow, that's awesome, thanks very much Josh! -Russ
On Mon, Mar 24, 2014 at 3:41 PM, Josh Elser <[email protected]> wrote: > Russ, > > Check out https://github.com/joshelser/accumulo-column-summing > > Using the SummingCombiner with a call to ScannerBase#fetchColumn(Text,Text) > will be a pretty decent solution for modest data sets. The (better > articulated than previously) reason why the SummingCombiner is sub-par is > that it only sums within a single row and not across rows. This is the > reason why making a custom iterator to sum across rows is desirable. > > Some results you can try running this microbenchmark from the test class > in the above repository. It creates a table with 1M rows, 7 columns per > row, and sums over a single column. We can lower the split threshold on our > table to split it out into more Tablets which should give more realistic > performance (pay the penalty for the RPC calls that you would at "scale"). > The reduction in number of keys returned (and thus the amount of data over > the wire) should be the primary reason this approach is desirable. > > Hope this makes things clearer! > > Number of splits for table: 65 > Number of results to sum: 66 > Time for iterator: 4482 ms > Number of results to sum: 1000000 > Time for combiner: 4314 ms > > Number of results to sum: 66 > Time for iterator: 3651 ms > Number of results to sum: 1000000 > Time for combiner: 3754 ms > > Number of results to sum: 66 > Time for iterator: 3685 ms > Number of results to sum: 1000000 > Time for combiner: 3839 ms > > Number of results to sum: 66 > Time for iterator: 3643 ms > Number of results to sum: 1000000 > Time for combiner: 4066 ms > > Number of results to sum: 66 > Time for iterator: 3880 ms > Number of results to sum: 1000000 > Time for combiner: 4084 ms > > > On 3/20/14, 9:49 PM, Josh Elser wrote: > >> Russ, >> >> Close to it. I'll try to work up some actual code to what I'm suggesting. >> >> On 3/20/14, 1:12 AM, Russ Weeks wrote: >> >>> Hi, Josh, >>> >>> Thanks for walking me through this. This is my first stab at it: >>> >>> public class RowSummingCombiner extends WrappingIterator { >>> >>> Key lastKey; >>> long sum; >>> >>> public Key getTopKey() { >>> >>> if (lastKey == null) >>> >>> return super.getTopKey(); >>> >>> return lastKey; >>> } >>> public Value getTopValue() { >>> >>> lastKey = null; >>> >>> return new Value(Long.toString(sum).getBytes()); >>> >>> } >>> public boolean hasTop() { >>> >>> return lastKey != null || super.hasTop(); >>> >>> } >>> public void next() throws IOException { >>> >>> while (super.hasTop()) { >>> >>> lastKey = super.getTopKey(); >>> >>> if (!lastKey.isDeleted()) { >>> >>> sum += Long.parseLong(super.getTopValue().toString()); >>> >>> } >>> super.next(); >>> >>> } >>> } >>> public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment >>> env) { >>> >>> RowSummingCombiner instance = new RowSummingCombiner(); >>> >>> instance.setSource(getSource().deepCopy(env)); >>> >>> return instance; >>> } >>> } >>> >>> I restrict the scanner to the single CF/CQ that I'm interested in >>> summing. The biggest disadvantage is that I can't utilize any of the >>> logic in the Combiner class hierarchy for value decoding etc. because >>> the logic to "combine" based on the common (row, cf, cq, vis) tuple is >>> baked in at the top level of that hierarchy and I don't see an easy way >>> to plug in new behaviour. But, each instance of the RowSummingCombiner >>> returns its own sum, and then my client just has to add up a handful of >>> values. Is this what you were getting at? >>> >>> Regards, >>> -Russ >>> >>> >>> On Wed, Mar 19, 2014 at 3:51 PM, Josh Elser <[email protected] >>> <mailto:[email protected]>> wrote: >>> >>> Ummm, you got the gist of it (I may have misspoke in what I >>> initially said). >>> >>> What my first thought was to make an iterator that will filter down >>> to the columns that you want. It doesn't look like we have an >>> iterator that will efficiently do this for you included in the core >>> (although, I know I've done something similar in the past like >>> this). This iterator would scan the rows on your table returning >>> just the columns you want. >>> >>> 000200001ccaac30 meta:size [] 1807 >>> 000200001cdaac30 meta:size [] 656 >>> 000200001cfaac30 meta:size [] 565 >>> >>> Then, we could put the summing combiner on top of that iterator to >>> sum those and get back a single key. The row in the key you return >>> should be the last row you included in the sum. This way, if a retry >>> happens under the hood by the batchscanner, you'll resume where you >>> left off and won't double-count things. >>> >>> (you could even do things like sum a maximum of N rows before >>> returning back some intermediate count to better parallelize things) >>> >>> 000200001cfaac30 meta:size [] 3028 >>> >>> So, each "ScanSession" (what the batchscanner is doing underneath >>> the hood) would return you a value which your client would do a >>> final summation. >>> >>> The final stack would be {(data from accumulo) > SKVI to project >>> columns > summing combiner} > final summation, where {...} denotes >>> work done server-side. This is one of those things that really >>> shines with the Accumulo API. >>> >>> >>> On 3/19/14, 6:40 PM, Russ Weeks wrote: >>> >>> Hi, Josh, >>> >>> Thanks very much for your response. I think I get what you're >>> saying, >>> but it's kind of blowing my mind. >>> >>> Are you saying that if I first set up an iterator that took my >>> key/value >>> pairs like, >>> >>> 000200001ccaac30 meta:size [] 1807 >>> 000200001ccaac30 meta:source [] data2 >>> 000200001cdaac30 meta:filename [] doc02985453 >>> 000200001cdaac30 meta:size [] 656 >>> 000200001cdaac30 meta:source [] data2 >>> 000200001cfaac30 meta:filename [] doc04484522 >>> 000200001cfaac30 meta:size [] 565 >>> 000200001cfaac30 meta:source [] data2 >>> 000200001dcaac30 meta:filename [] doc03342958 >>> >>> And emitted something like, >>> >>> 0 meta:size [] 1807 >>> 0 meta:size [] 656 >>> 0 meta:size [] 565 >>> >>> And then applied a SummingCombiner at a lower priority than that >>> iterator, then... it should work, right? >>> >>> I'll give it a try. >>> >>> Regards, >>> -Russ >>> >>> >>> On Wed, Mar 19, 2014 at 3:33 PM, Josh Elser >>> <[email protected] <mailto:[email protected]> >>> <mailto:[email protected] <mailto:[email protected]>>> >>> wrote: >>> >>> Russ, >>> >>> Remember about the distribution of data across multiple >>> nodes in >>> your cluster by tablet. >>> >>> A tablet, at the very minimum, will contain one row. Any >>> way to say >>> that same thing is that a row will never be split across >>> multiple >>> tablets. The only guarantee you get from Accumulo here is >>> that you >>> can use a combiner to do you combination across one row. >>> >>> However, when you combine (pun not intended) another SKVI >>> with the >>> Combiner, you can do more merging of that intermediate >>> "combined >>> value" from each row before returning back to the client. >>> You can >>> think of this approach as doing a multi-level summation. >>> >>> This still requires one final sum on the client side, but >>> you should >>> get quite the reduction with this approach over doing the >>> entire sum >>> client side. You sum the meta:size column in parallel >>> across parts >>> of the table (server-side) and then client-side you sum the >>> sums >>> from each part. >>> >>> I can sketch this out in more detail if it's not clear. HTH >>> >>> >>> On 3/19/14, 6:18 PM, Russ Weeks wrote: >>> >>> The accumulo manual states that combiners can be >>> applied to >>> values which >>> share the same rowID, column family, and column >>> qualifier. Is >>> there any >>> way to adjust this behaviour? I have rows that look >>> like, >>> >>> 000200001ccaac30 meta:size [] 1807 >>> 000200001ccaac30 meta:source [] data2 >>> 000200001cdaac30 meta:filename [] doc02985453 >>> 000200001cdaac30 meta:size [] 656 >>> 000200001cdaac30 meta:source [] data2 >>> 000200001cfaac30 meta:filename [] doc04484522 >>> 000200001cfaac30 meta:size [] 565 >>> 000200001cfaac30 meta:source [] data2 >>> 000200001dcaac30 meta:filename [] doc03342958 >>> >>> and I'd like to sum up all the values of meta:size >>> across all >>> rows. I >>> know I can scan the sizes and sum them on the client >>> side, but I was >>> hoping there would be a way to do this inside my >>> cluster. Is >>> mapreduce >>> my only option here? >>> >>> Thanks, >>> -Russ >>> >>> >>> >>>
