Re: Combiner behaviour

2014-03-25 Thread Russ Weeks
Wow, that's awesome, thanks very much Josh!

-Russ


On Mon, Mar 24, 2014 at 3:41 PM, Josh Elser josh.el...@gmail.com wrote:

 Russ,

 Check out https://github.com/joshelser/accumulo-column-summing

 Using the SummingCombiner with a call to ScannerBase#fetchColumn(Text,Text)
 will be a pretty decent solution for modest data sets. The (better
 articulated than previously) reason why the SummingCombiner is sub-par is
 that it only sums within a single row and not across rows. This is the
 reason why making a custom iterator to sum across rows is desirable.

 Some results you can try running this microbenchmark from the test class
 in the above repository. It creates a table with 1M rows, 7 columns per
 row, and sums over a single column. We can lower the split threshold on our
 table to split it out into more Tablets which should give more realistic
 performance (pay the penalty for the RPC calls that you would at scale).
 The reduction in number of keys returned (and thus the amount of data over
 the wire) should be the primary reason this approach is desirable.

 Hope this makes things clearer!

 Number of splits for table: 65
 Number of results to sum: 66
 Time for iterator: 4482 ms
 Number of results to sum: 100
 Time for combiner: 4314 ms

 Number of results to sum: 66
 Time for iterator: 3651 ms
 Number of results to sum: 100
 Time for combiner: 3754 ms

 Number of results to sum: 66
 Time for iterator: 3685 ms
 Number of results to sum: 100
 Time for combiner: 3839 ms

 Number of results to sum: 66
 Time for iterator: 3643 ms
 Number of results to sum: 100
 Time for combiner: 4066 ms

 Number of results to sum: 66
 Time for iterator: 3880 ms
 Number of results to sum: 100
 Time for combiner: 4084 ms


 On 3/20/14, 9:49 PM, Josh Elser wrote:

 Russ,

 Close to it. I'll try to work up some actual code to what I'm suggesting.

 On 3/20/14, 1:12 AM, Russ Weeks wrote:

 Hi, Josh,

 Thanks for walking me through this.  This is my first stab at it:

 public class RowSummingCombiner extends WrappingIterator {

 Key lastKey;
 long sum;

 public Key getTopKey() {

 if (lastKey == null)

 return super.getTopKey();

 return lastKey;
 }
 public Value getTopValue() {

 lastKey = null;

 return new Value(Long.toString(sum).getBytes());

 }
 public boolean hasTop() {

 return lastKey != null || super.hasTop();

 }
 public void next() throws IOException {

 while (super.hasTop()) {

 lastKey = super.getTopKey();

 if (!lastKey.isDeleted()) {

 sum += Long.parseLong(super.getTopValue().toString());

 }
 super.next();

 }
 }
 public SortedKeyValueIteratorKey,Value deepCopy(IteratorEnvironment
 env) {

 RowSummingCombiner instance = new RowSummingCombiner();

 instance.setSource(getSource().deepCopy(env));

 return instance;
 }
 }

 I restrict the scanner to the single CF/CQ that I'm interested in
 summing. The biggest disadvantage is that I can't utilize any of the
 logic in the Combiner class hierarchy for value decoding etc. because
 the logic to combine based on the common (row, cf, cq, vis) tuple is
 baked in at the top level of that hierarchy and I don't see an easy way
 to plug in new behaviour. But, each instance of the RowSummingCombiner
 returns its own sum, and then my client just has to add up a handful of
 values. Is this what you were getting at?

 Regards,
 -Russ


 On Wed, Mar 19, 2014 at 3:51 PM, Josh Elser josh.el...@gmail.com
 mailto:josh.el...@gmail.com wrote:

 Ummm, you got the gist of it (I may have misspoke in what I
 initially said).

 What my first thought was to make an iterator that will filter down
 to the columns that you want. It doesn't look like we have an
 iterator that will efficiently do this for you included in the core
 (although, I know I've done something similar in the past like
 this). This iterator would scan the rows on your table returning
 just the columns you want.

 00021ccaac30 meta:size []1807
 00021cdaac30 meta:size []656
 00021cfaac30 meta:size []565

 Then, we could put the summing combiner on top of that iterator to
 sum those and get back a single key. The row in the key you return
 should be the last row you included in the sum. This way, if a retry
 happens under the hood by the batchscanner, you'll resume where you
 left off and won't double-count things.

 (you could even do things like sum a maximum of N rows before
 returning back some intermediate count to better parallelize things)

 00021cfaac30 meta:size []3028

 So, each ScanSession (what the batchscanner is doing underneath
 the hood) would return you a value which your client would do a
 final summation.

 The final stack would be {(data from accumulo)  SKVI to project
 columns  summing combiner}  final summation, where {...} denotes
 work done server-side. This is one of those things that really
 shines with the Accumulo API.


 On 3/19/14, 6:40 

Re: Combiner behaviour

2014-03-24 Thread Josh Elser

Russ,

Check out https://github.com/joshelser/accumulo-column-summing

Using the SummingCombiner with a call to 
ScannerBase#fetchColumn(Text,Text) will be a pretty decent solution for 
modest data sets. The (better articulated than previously) reason why 
the SummingCombiner is sub-par is that it only sums within a single row 
and not across rows. This is the reason why making a custom iterator to 
sum across rows is desirable.


Some results you can try running this microbenchmark from the test class 
in the above repository. It creates a table with 1M rows, 7 columns per 
row, and sums over a single column. We can lower the split threshold on 
our table to split it out into more Tablets which should give more 
realistic performance (pay the penalty for the RPC calls that you would 
at scale). The reduction in number of keys returned (and thus the 
amount of data over the wire) should be the primary reason this approach 
is desirable.


Hope this makes things clearer!

Number of splits for table: 65
Number of results to sum: 66
Time for iterator: 4482 ms
Number of results to sum: 100
Time for combiner: 4314 ms

Number of results to sum: 66
Time for iterator: 3651 ms
Number of results to sum: 100
Time for combiner: 3754 ms

Number of results to sum: 66
Time for iterator: 3685 ms
Number of results to sum: 100
Time for combiner: 3839 ms

Number of results to sum: 66
Time for iterator: 3643 ms
Number of results to sum: 100
Time for combiner: 4066 ms

Number of results to sum: 66
Time for iterator: 3880 ms
Number of results to sum: 100
Time for combiner: 4084 ms

On 3/20/14, 9:49 PM, Josh Elser wrote:

Russ,

Close to it. I'll try to work up some actual code to what I'm suggesting.

On 3/20/14, 1:12 AM, Russ Weeks wrote:

Hi, Josh,

Thanks for walking me through this.  This is my first stab at it:

public class RowSummingCombiner extends WrappingIterator {

Key lastKey;
long sum;

public Key getTopKey() {

if (lastKey == null)

return super.getTopKey();

return lastKey;
}
public Value getTopValue() {

lastKey = null;

return new Value(Long.toString(sum).getBytes());

}
public boolean hasTop() {

return lastKey != null || super.hasTop();

}
public void next() throws IOException {

while (super.hasTop()) {

lastKey = super.getTopKey();

if (!lastKey.isDeleted()) {

sum += Long.parseLong(super.getTopValue().toString());

}
super.next();

}
}
public SortedKeyValueIteratorKey,Value deepCopy(IteratorEnvironment
env) {

RowSummingCombiner instance = new RowSummingCombiner();

instance.setSource(getSource().deepCopy(env));

return instance;
}
}

I restrict the scanner to the single CF/CQ that I'm interested in
summing. The biggest disadvantage is that I can't utilize any of the
logic in the Combiner class hierarchy for value decoding etc. because
the logic to combine based on the common (row, cf, cq, vis) tuple is
baked in at the top level of that hierarchy and I don't see an easy way
to plug in new behaviour. But, each instance of the RowSummingCombiner
returns its own sum, and then my client just has to add up a handful of
values. Is this what you were getting at?

Regards,
-Russ


On Wed, Mar 19, 2014 at 3:51 PM, Josh Elser josh.el...@gmail.com
mailto:josh.el...@gmail.com wrote:

Ummm, you got the gist of it (I may have misspoke in what I
initially said).

What my first thought was to make an iterator that will filter down
to the columns that you want. It doesn't look like we have an
iterator that will efficiently do this for you included in the core
(although, I know I've done something similar in the past like
this). This iterator would scan the rows on your table returning
just the columns you want.

00021ccaac30 meta:size []1807
00021cdaac30 meta:size []656
00021cfaac30 meta:size []565

Then, we could put the summing combiner on top of that iterator to
sum those and get back a single key. The row in the key you return
should be the last row you included in the sum. This way, if a retry
happens under the hood by the batchscanner, you'll resume where you
left off and won't double-count things.

(you could even do things like sum a maximum of N rows before
returning back some intermediate count to better parallelize things)

00021cfaac30 meta:size []3028

So, each ScanSession (what the batchscanner is doing underneath
the hood) would return you a value which your client would do a
final summation.

The final stack would be {(data from accumulo)  SKVI to project
columns  summing combiner}  final summation, where {...} denotes
work done server-side. This is one of those things that really
shines with the Accumulo API.


On 3/19/14, 6:40 PM, Russ Weeks wrote:

Hi, Josh,

Thanks very much for your response. I think I get what you're
saying,
but it's kind of blowing my mind.

Are you saying that if I first set up an 

Re: Combiner behaviour

2014-03-20 Thread Josh Elser

Russ,

Close to it. I'll try to work up some actual code to what I'm suggesting.

On 3/20/14, 1:12 AM, Russ Weeks wrote:

Hi, Josh,

Thanks for walking me through this.  This is my first stab at it:

public class RowSummingCombiner extends WrappingIterator {

Key lastKey;
long sum;

public Key getTopKey() {

if (lastKey == null)

return super.getTopKey();

return lastKey;
}
public Value getTopValue() {

lastKey = null;

return new Value(Long.toString(sum).getBytes());

}
public boolean hasTop() {

return lastKey != null || super.hasTop();

}
public void next() throws IOException {

while (super.hasTop()) {

lastKey = super.getTopKey();

if (!lastKey.isDeleted()) {

sum += Long.parseLong(super.getTopValue().toString());

}
super.next();

}
}
public SortedKeyValueIteratorKey,Value deepCopy(IteratorEnvironment env) {

RowSummingCombiner instance = new RowSummingCombiner();

instance.setSource(getSource().deepCopy(env));

return instance;
}
}

I restrict the scanner to the single CF/CQ that I'm interested in
summing. The biggest disadvantage is that I can't utilize any of the
logic in the Combiner class hierarchy for value decoding etc. because
the logic to combine based on the common (row, cf, cq, vis) tuple is
baked in at the top level of that hierarchy and I don't see an easy way
to plug in new behaviour. But, each instance of the RowSummingCombiner
returns its own sum, and then my client just has to add up a handful of
values. Is this what you were getting at?

Regards,
-Russ


On Wed, Mar 19, 2014 at 3:51 PM, Josh Elser josh.el...@gmail.com
mailto:josh.el...@gmail.com wrote:

Ummm, you got the gist of it (I may have misspoke in what I
initially said).

What my first thought was to make an iterator that will filter down
to the columns that you want. It doesn't look like we have an
iterator that will efficiently do this for you included in the core
(although, I know I've done something similar in the past like
this). This iterator would scan the rows on your table returning
just the columns you want.

00021ccaac30 meta:size []1807
00021cdaac30 meta:size []656
00021cfaac30 meta:size []565

Then, we could put the summing combiner on top of that iterator to
sum those and get back a single key. The row in the key you return
should be the last row you included in the sum. This way, if a retry
happens under the hood by the batchscanner, you'll resume where you
left off and won't double-count things.

(you could even do things like sum a maximum of N rows before
returning back some intermediate count to better parallelize things)

00021cfaac30 meta:size []3028

So, each ScanSession (what the batchscanner is doing underneath
the hood) would return you a value which your client would do a
final summation.

The final stack would be {(data from accumulo)  SKVI to project
columns  summing combiner}  final summation, where {...} denotes
work done server-side. This is one of those things that really
shines with the Accumulo API.


On 3/19/14, 6:40 PM, Russ Weeks wrote:

Hi, Josh,

Thanks very much for your response. I think I get what you're
saying,
but it's kind of blowing my mind.

Are you saying that if I first set up an iterator that took my
key/value
pairs like,

00021ccaac30 meta:size []1807
00021ccaac30 meta:source []data2
00021cdaac30 meta:filename []doc02985453
00021cdaac30 meta:size []656
00021cdaac30 meta:source []data2
00021cfaac30 meta:filename []doc04484522
00021cfaac30 meta:size []565
00021cfaac30 meta:source []data2
00021dcaac30 meta:filename []doc03342958

And emitted something like,

0 meta:size [] 1807
0 meta:size [] 656
0 meta:size [] 565

And then applied a SummingCombiner at a lower priority than that
iterator, then... it should work, right?

I'll give it a try.

Regards,
-Russ


On Wed, Mar 19, 2014 at 3:33 PM, Josh Elser
josh.el...@gmail.com mailto:josh.el...@gmail.com
mailto:josh.el...@gmail.com mailto:josh.el...@gmail.com wrote:

 Russ,

 Remember about the distribution of data across multiple
nodes in
 your cluster by tablet.

 A tablet, at the very minimum, will contain one row. Any
way to say
 that same thing is that a row will never be split across
multiple
 tablets. The only guarantee you get from Accumulo here is
that you
 can use a combiner to do you combination across one row.

 However, when you combine (pun not intended) another SKVI
with the
 Combiner, you can do more merging of that intermediate

Combiner behaviour

2014-03-19 Thread Russ Weeks
The accumulo manual states that combiners can be applied to values which
share the same rowID, column family, and column qualifier. Is there any way
to adjust this behaviour? I have rows that look like,

00021ccaac30 meta:size []1807
00021ccaac30 meta:source []data2
00021cdaac30 meta:filename []doc02985453
00021cdaac30 meta:size []656
00021cdaac30 meta:source []data2
00021cfaac30 meta:filename []doc04484522
00021cfaac30 meta:size []565
00021cfaac30 meta:source []data2
00021dcaac30 meta:filename []doc03342958

and I'd like to sum up all the values of meta:size across all rows.  I know
I can scan the sizes and sum them on the client side, but I was hoping
there would be a way to do this inside my cluster. Is mapreduce my only
option here?

Thanks,
-Russ


Re: Combiner behaviour

2014-03-19 Thread Josh Elser

Russ,

Remember about the distribution of data across multiple nodes in your 
cluster by tablet.


A tablet, at the very minimum, will contain one row. Any way to say that 
same thing is that a row will never be split across multiple tablets. 
The only guarantee you get from Accumulo here is that you can use a 
combiner to do you combination across one row.


However, when you combine (pun not intended) another SKVI with the 
Combiner, you can do more merging of that intermediate combined value 
from each row before returning back to the client. You can think of this 
approach as doing a multi-level summation.


This still requires one final sum on the client side, but you should get 
quite the reduction with this approach over doing the entire sum client 
side. You sum the meta:size column in parallel across parts of the table 
(server-side) and then client-side you sum the sums from each part.


I can sketch this out in more detail if it's not clear. HTH

On 3/19/14, 6:18 PM, Russ Weeks wrote:

The accumulo manual states that combiners can be applied to values which
share the same rowID, column family, and column qualifier. Is there any
way to adjust this behaviour? I have rows that look like,

00021ccaac30 meta:size []1807
00021ccaac30 meta:source []data2
00021cdaac30 meta:filename []doc02985453
00021cdaac30 meta:size []656
00021cdaac30 meta:source []data2
00021cfaac30 meta:filename []doc04484522
00021cfaac30 meta:size []565
00021cfaac30 meta:source []data2
00021dcaac30 meta:filename []doc03342958

and I'd like to sum up all the values of meta:size across all rows.  I
know I can scan the sizes and sum them on the client side, but I was
hoping there would be a way to do this inside my cluster. Is mapreduce
my only option here?

Thanks,
-Russ


Re: Combiner behaviour

2014-03-19 Thread Russ Weeks
Hi, Josh,

Thanks very much for your response. I think I get what you're saying, but
it's kind of blowing my mind.

Are you saying that if I first set up an iterator that took my key/value
pairs like,

00021ccaac30 meta:size []1807
00021ccaac30 meta:source []data2
00021cdaac30 meta:filename []doc02985453
00021cdaac30 meta:size []656
00021cdaac30 meta:source []data2
00021cfaac30 meta:filename []doc04484522
00021cfaac30 meta:size []565
00021cfaac30 meta:source []data2
00021dcaac30 meta:filename []doc03342958

And emitted something like,

0 meta:size [] 1807
0 meta:size [] 656
0 meta:size [] 565

And then applied a SummingCombiner at a lower priority than that iterator,
then... it should work, right?

I'll give it a try.

Regards,
-Russ


On Wed, Mar 19, 2014 at 3:33 PM, Josh Elser josh.el...@gmail.com wrote:

 Russ,

 Remember about the distribution of data across multiple nodes in your
 cluster by tablet.

 A tablet, at the very minimum, will contain one row. Any way to say that
 same thing is that a row will never be split across multiple tablets. The
 only guarantee you get from Accumulo here is that you can use a combiner to
 do you combination across one row.

 However, when you combine (pun not intended) another SKVI with the
 Combiner, you can do more merging of that intermediate combined value
 from each row before returning back to the client. You can think of this
 approach as doing a multi-level summation.

 This still requires one final sum on the client side, but you should get
 quite the reduction with this approach over doing the entire sum client
 side. You sum the meta:size column in parallel across parts of the table
 (server-side) and then client-side you sum the sums from each part.

 I can sketch this out in more detail if it's not clear. HTH


 On 3/19/14, 6:18 PM, Russ Weeks wrote:

 The accumulo manual states that combiners can be applied to values which
 share the same rowID, column family, and column qualifier. Is there any
 way to adjust this behaviour? I have rows that look like,

 00021ccaac30 meta:size []1807
 00021ccaac30 meta:source []data2
 00021cdaac30 meta:filename []doc02985453
 00021cdaac30 meta:size []656
 00021cdaac30 meta:source []data2
 00021cfaac30 meta:filename []doc04484522
 00021cfaac30 meta:size []565
 00021cfaac30 meta:source []data2
 00021dcaac30 meta:filename []doc03342958

 and I'd like to sum up all the values of meta:size across all rows.  I
 know I can scan the sizes and sum them on the client side, but I was
 hoping there would be a way to do this inside my cluster. Is mapreduce
 my only option here?

 Thanks,
 -Russ




Re: Combiner behaviour

2014-03-19 Thread John Vines
Be careful when changing row values, especially outside of the tablet
range, as I believe it can cause the data to be dropped or rejected.


On Wed, Mar 19, 2014 at 6:40 PM, Russ Weeks rwe...@newbrightidea.comwrote:

 Hi, Josh,

 Thanks very much for your response. I think I get what you're saying, but
 it's kind of blowing my mind.

 Are you saying that if I first set up an iterator that took my key/value
 pairs like,

 00021ccaac30 meta:size []1807
 00021ccaac30 meta:source []data2
 00021cdaac30 meta:filename []doc02985453
 00021cdaac30 meta:size []656
 00021cdaac30 meta:source []data2
 00021cfaac30 meta:filename []doc04484522
 00021cfaac30 meta:size []565
 00021cfaac30 meta:source []data2
 00021dcaac30 meta:filename []doc03342958

 And emitted something like,

 0 meta:size [] 1807
 0 meta:size [] 656
 0 meta:size [] 565

 And then applied a SummingCombiner at a lower priority than that iterator,
 then... it should work, right?

 I'll give it a try.

 Regards,
 -Russ


 On Wed, Mar 19, 2014 at 3:33 PM, Josh Elser josh.el...@gmail.com wrote:

 Russ,

 Remember about the distribution of data across multiple nodes in your
 cluster by tablet.

 A tablet, at the very minimum, will contain one row. Any way to say that
 same thing is that a row will never be split across multiple tablets. The
 only guarantee you get from Accumulo here is that you can use a combiner to
 do you combination across one row.

 However, when you combine (pun not intended) another SKVI with the
 Combiner, you can do more merging of that intermediate combined value
 from each row before returning back to the client. You can think of this
 approach as doing a multi-level summation.

 This still requires one final sum on the client side, but you should get
 quite the reduction with this approach over doing the entire sum client
 side. You sum the meta:size column in parallel across parts of the table
 (server-side) and then client-side you sum the sums from each part.

 I can sketch this out in more detail if it's not clear. HTH


 On 3/19/14, 6:18 PM, Russ Weeks wrote:

 The accumulo manual states that combiners can be applied to values which
 share the same rowID, column family, and column qualifier. Is there any
 way to adjust this behaviour? I have rows that look like,

 00021ccaac30 meta:size []1807
 00021ccaac30 meta:source []data2
 00021cdaac30 meta:filename []doc02985453
 00021cdaac30 meta:size []656
 00021cdaac30 meta:source []data2
 00021cfaac30 meta:filename []doc04484522
 00021cfaac30 meta:size []565
 00021cfaac30 meta:source []data2
 00021dcaac30 meta:filename []doc03342958

 and I'd like to sum up all the values of meta:size across all rows.  I
 know I can scan the sizes and sum them on the client side, but I was
 hoping there would be a way to do this inside my cluster. Is mapreduce
 my only option here?

 Thanks,
 -Russ





Re: Combiner behaviour

2014-03-19 Thread Josh Elser

Ummm, you got the gist of it (I may have misspoke in what I initially said).

What my first thought was to make an iterator that will filter down to 
the columns that you want. It doesn't look like we have an iterator that 
will efficiently do this for you included in the core (although, I know 
I've done something similar in the past like this). This iterator would 
scan the rows on your table returning just the columns you want.


00021ccaac30 meta:size []1807
00021cdaac30 meta:size []656
00021cfaac30 meta:size []565

Then, we could put the summing combiner on top of that iterator to sum 
those and get back a single key. The row in the key you return should be 
the last row you included in the sum. This way, if a retry happens under 
the hood by the batchscanner, you'll resume where you left off and won't 
double-count things.


(you could even do things like sum a maximum of N rows before returning 
back some intermediate count to better parallelize things)


00021cfaac30 meta:size []3028

So, each ScanSession (what the batchscanner is doing underneath the 
hood) would return you a value which your client would do a final summation.


The final stack would be {(data from accumulo)  SKVI to project columns 
 summing combiner}  final summation, where {...} denotes work done 
server-side. This is one of those things that really shines with the 
Accumulo API.


On 3/19/14, 6:40 PM, Russ Weeks wrote:

Hi, Josh,

Thanks very much for your response. I think I get what you're saying,
but it's kind of blowing my mind.

Are you saying that if I first set up an iterator that took my key/value
pairs like,

00021ccaac30 meta:size []1807
00021ccaac30 meta:source []data2
00021cdaac30 meta:filename []doc02985453
00021cdaac30 meta:size []656
00021cdaac30 meta:source []data2
00021cfaac30 meta:filename []doc04484522
00021cfaac30 meta:size []565
00021cfaac30 meta:source []data2
00021dcaac30 meta:filename []doc03342958

And emitted something like,

0 meta:size [] 1807
0 meta:size [] 656
0 meta:size [] 565

And then applied a SummingCombiner at a lower priority than that
iterator, then... it should work, right?

I'll give it a try.

Regards,
-Russ


On Wed, Mar 19, 2014 at 3:33 PM, Josh Elser josh.el...@gmail.com
mailto:josh.el...@gmail.com wrote:

Russ,

Remember about the distribution of data across multiple nodes in
your cluster by tablet.

A tablet, at the very minimum, will contain one row. Any way to say
that same thing is that a row will never be split across multiple
tablets. The only guarantee you get from Accumulo here is that you
can use a combiner to do you combination across one row.

However, when you combine (pun not intended) another SKVI with the
Combiner, you can do more merging of that intermediate combined
value from each row before returning back to the client. You can
think of this approach as doing a multi-level summation.

This still requires one final sum on the client side, but you should
get quite the reduction with this approach over doing the entire sum
client side. You sum the meta:size column in parallel across parts
of the table (server-side) and then client-side you sum the sums
from each part.

I can sketch this out in more detail if it's not clear. HTH


On 3/19/14, 6:18 PM, Russ Weeks wrote:

The accumulo manual states that combiners can be applied to
values which
share the same rowID, column family, and column qualifier. Is
there any
way to adjust this behaviour? I have rows that look like,

00021ccaac30 meta:size []1807
00021ccaac30 meta:source []data2
00021cdaac30 meta:filename []doc02985453
00021cdaac30 meta:size []656
00021cdaac30 meta:source []data2
00021cfaac30 meta:filename []doc04484522
00021cfaac30 meta:size []565
00021cfaac30 meta:source []data2
00021dcaac30 meta:filename []doc03342958

and I'd like to sum up all the values of meta:size across all
rows.  I
know I can scan the sizes and sum them on the client side, but I was
hoping there would be a way to do this inside my cluster. Is
mapreduce
my only option here?

Thanks,
-Russ




Re: Combiner behaviour

2014-03-19 Thread Russ Weeks
Hi, Josh,

Thanks for walking me through this.  This is my first stab at it:

public class RowSummingCombiner extends WrappingIterator {
  Key lastKey;
  long sum;

  public Key getTopKey() {
if (lastKey == null)
  return super.getTopKey();
return lastKey;
  }

  public Value getTopValue() {
lastKey = null;
return new Value(Long.toString(sum).getBytes());
  }

  public boolean hasTop() {
return lastKey != null || super.hasTop();
  }

  public void next() throws IOException {
while (super.hasTop()) {
lastKey = super.getTopKey();
if (!lastKey.isDeleted()) {
sum += Long.parseLong(super.getTopValue().toString());
}
super.next();
}
  }

  public SortedKeyValueIteratorKey,Value deepCopy(IteratorEnvironment env) {
RowSummingCombiner instance = new RowSummingCombiner();
instance.setSource(getSource().deepCopy(env));
return instance;
  }

}

I restrict the scanner to the single CF/CQ that I'm interested in summing.
The biggest disadvantage is that I can't utilize any of the logic in the
Combiner class hierarchy for value decoding etc. because the logic to
combine based on the common (row, cf, cq, vis) tuple is baked in at the
top level of that hierarchy and I don't see an easy way to plug in new
behaviour. But, each instance of the RowSummingCombiner returns its own
sum, and then my client just has to add up a handful of values. Is this
what you were getting at?

Regards,
-Russ


On Wed, Mar 19, 2014 at 3:51 PM, Josh Elser josh.el...@gmail.com wrote:

 Ummm, you got the gist of it (I may have misspoke in what I initially
 said).

 What my first thought was to make an iterator that will filter down to the
 columns that you want. It doesn't look like we have an iterator that will
 efficiently do this for you included in the core (although, I know I've
 done something similar in the past like this). This iterator would scan the
 rows on your table returning just the columns you want.

 00021ccaac30 meta:size []1807
 00021cdaac30 meta:size []656
 00021cfaac30 meta:size []565

 Then, we could put the summing combiner on top of that iterator to sum
 those and get back a single key. The row in the key you return should be
 the last row you included in the sum. This way, if a retry happens under
 the hood by the batchscanner, you'll resume where you left off and won't
 double-count things.

 (you could even do things like sum a maximum of N rows before returning
 back some intermediate count to better parallelize things)

 00021cfaac30 meta:size []3028

 So, each ScanSession (what the batchscanner is doing underneath the
 hood) would return you a value which your client would do a final summation.

 The final stack would be {(data from accumulo)  SKVI to project columns 
 summing combiner}  final summation, where {...} denotes work done
 server-side. This is one of those things that really shines with the
 Accumulo API.


 On 3/19/14, 6:40 PM, Russ Weeks wrote:

 Hi, Josh,

 Thanks very much for your response. I think I get what you're saying,
 but it's kind of blowing my mind.

 Are you saying that if I first set up an iterator that took my key/value
 pairs like,

 00021ccaac30 meta:size []1807
 00021ccaac30 meta:source []data2
 00021cdaac30 meta:filename []doc02985453
 00021cdaac30 meta:size []656
 00021cdaac30 meta:source []data2
 00021cfaac30 meta:filename []doc04484522
 00021cfaac30 meta:size []565
 00021cfaac30 meta:source []data2
 00021dcaac30 meta:filename []doc03342958

 And emitted something like,

 0 meta:size [] 1807
 0 meta:size [] 656
 0 meta:size [] 565

 And then applied a SummingCombiner at a lower priority than that
 iterator, then... it should work, right?

 I'll give it a try.

 Regards,
 -Russ


 On Wed, Mar 19, 2014 at 3:33 PM, Josh Elser josh.el...@gmail.com
 mailto:josh.el...@gmail.com wrote:

 Russ,

 Remember about the distribution of data across multiple nodes in
 your cluster by tablet.

 A tablet, at the very minimum, will contain one row. Any way to say
 that same thing is that a row will never be split across multiple
 tablets. The only guarantee you get from Accumulo here is that you
 can use a combiner to do you combination across one row.

 However, when you combine (pun not intended) another SKVI with the
 Combiner, you can do more merging of that intermediate combined
 value from each row before returning back to the client. You can
 think of this approach as doing a multi-level summation.

 This still requires one final sum on the client side, but you should
 get quite the reduction with this approach over doing the entire sum
 client side. You sum the meta:size column in parallel across parts
 of the table (server-side) and then client-side you sum the sums
 from each part.

 I can sketch this out in more detail if it's not