The javadoc goes through a lengthy explanation:
http://beam.incubator.apache.org/documentation/sdks/javadoc/0.3.0-incubating/org/apache/beam/sdk/transforms/Combine.CombineFn.html

A CombineFn<InputT, AccumT, OutputT> specifies how to combine a collection
of input values of type InputT into a single output value of type OutputT.
It does this via one or more intermediate mutable accumulator values of
type AccumT.
The overall process to combine a collection of input InputT values into a
single output OutputT value is as follows:

The input InputT values are partitioned into one or more batches.
1) For each batch, the createAccumulator() operation is invoked to create a
fresh mutable accumulator value of type AccumT, initialized to represent
the combination of zero values.
2) For each input InputT value in a batch, the addInput(AccumT, InputT)
operation is invoked to add the value to that batch's accumulator AccumT
value. The accumulator may just record the new value (e.g., if AccumT ==
List<InputT>, or may do work to represent the combination more compactly.
3) The mergeAccumulators(java.lang.Iterable<AccumT>) operation is invoked
to combine a collection of accumulator AccumT values into a single combined
output accumulator AccumT value, once the merging accumulators have had all
all the input values in their batches added to them. This operation is
invoked repeatedly, until there is only one accumulator value left.
4) The extractOutput(AccumT) operation is invoked on the final accumulator
AccumT value to get the output OutputT value.
For example:


 public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double>
{
   public static class Accum {
     int sum = 0;
     int count = 0;
   }
   public Accum createAccumulator() {
     return new Accum();
   }
   public Accum addInput(Accum accum, Integer input) {
       accum.sum += input;
       accum.count++;
       return accum;
   }
   public Accum mergeAccumulators(Iterable<Accum> accums) {
     Accum merged = createAccumulator();
     for (Accum accum : accums) {
       merged.sum += accum.sum;
       merged.count += accum.count;
     }
     return merged;
   }
   public Double extractOutput(Accum accum) {
     return ((double) accum.sum) / accum.count;
   }
 }
 PCollection<Integer> pc = ...;
 PCollection<Double> average = pc.apply(Combine.globally(new AverageFn()));

Combining functions used by Combine.Globally, Combine.PerKey,
Combine.GroupedValues, and PTransforms derived from them should be
associative and commutative. Associativity is required because input values
are first broken up into subgroups before being combined, and their
intermediate results further combined, in an arbitrary tree structure.
Commutativity is required because any order of the input values is ignored
when breaking up input values into groups.

On Tue, Nov 22, 2016 at 7:11 AM, Matthias Baetens <
[email protected]> wrote:

> Hi Aljoscha,
>
> Thank you very much for the clear answer, it solves my problem (since I
> now know I should use a SerializableFunction when wanting to have a full
> view of the data for one key).
>
> For the second question, the accumulator that I wrote appends elements to
> the ArrayList of a custom object in case it's not in the list yet:
>
> @DefaultCoder(AvroCoder.class)
> public static class Accum {
> Log log = null;
> }
> @Override
> public Accum createAccumulator() { return new Accum(); }
> @Override
> public Accum addInput(Accum accum, Log input) {
> if(accum.log == null) {
> accum.log = new Log(input);
> }
> String[] hits = input.getHits_appInfo_screenName().split("/");
> for(int i = 0; i < full_list.length; i++) {
> if(Arrays.asList(hits).contains(full_list[i]) &&
> !accum.log.getSparseArray().contains(i)) accum.log.getSparseArray().
> add(i);
> }
> return accum;
> }
> @Override
> public Accum mergeAccumulators(Iterable<Accum> accums) {
> Accum merged = createAccumulator();
> for(Accum accum : accums) {
> if(merged.log == null){
> merged.log = new Log(accum.log);
> }
> else {
> for(Integer index : accum.log.getSparseArray()) {
> if(!merged.log.getSparseArray().contains(index))
> merged.log.getSparseArray().add(index);
> }
> }
> }
> return merged;
> }
> @Override
> public Log extractOutput(Accum accum) {
> return accum.log;
> }
>
> Hope this helps for answering the second question!
>
> Thanks again :)
>
> Best,
>
> Matthias
>
> On Tue, Nov 22, 2016 at 11:47 AM, Aljoscha Krettek <[email protected]>
> wrote:
>
>> Hi Matthias,
>> when the shuffle happens is not defined by the Beam model so it depends
>> on the runner. You are right, though, that a runner can optimise execution
>> when you specify a CombineFn. In that case a runner can choose to combine
>> elements before shuffling to reduce the amount of data that we have to
>> shuffle across the network. With a SerializableFunction that's not possible
>> because we don't have an intermediate accumulation type as we have with a
>> CombineFn. Therefore, the runner has to ship all elements for a given key
>> to one machine to apply the SerializableFunction.
>>
>> Regarding your second question, could you maybe send a code snipped? That
>> would allow us to have a look and give a good answer.
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 22 Nov 2016 at 12:32 Matthias Baetens <
>> [email protected]> wrote:
>>
>>> Hi there,
>>>
>>> I had some questions about the internal working of these two concepts
>>> and where I could find more info on this (so I might be able similar
>>> problems in the future myself. Here we go:
>>>
>>> + When doing a GroupByKey, when does the shuffling actually take place?
>>> Could it be the behaviour is not the same when using a CombineFn to
>>> aggregate compared to when using a Serializablefunction? (I have a feeling
>>> in the first case not all the keys get shuffled to one machine, while it
>>> does for the second).
>>>
>>> + When using Accumulators in a CombineFn, what are the actual internals?
>>> Is there any docs on this? The problem I run into is that, when I try
>>> adding elements to an ArrayList and then merge ArrayList, the output is an
>>> empty list. The problem could probably be solved by using a
>>> Serializablefunction to Combine everything at once, but you might loose the
>>> advantages of parallellisation in that case (~ above).
>>>
>>> Thanks a lot :)
>>>
>>> Best,
>>>
>>> Matthias
>>> --
>>>
>>> *Matthias Baetens*
>>>
>>>
>>> *datatonic | data power unleashed*
>>> office +44 203 668 3680 <+44%2020%203668%203680>  |  mobile +44 74 918
>>> 20646
>>>
>>> Level24 | 1 Canada Square | Canary Wharf | E14 5AB London
>>>
>>
>
>
> --
>
> *Matthias Baetens*
>
>
> *datatonic | data power unleashed*
> office +44 203 668 3680 <+44%2020%203668%203680>  |  mobile +44 74 918
> 20646
>
> Level24 | 1 Canada Square | Canary Wharf | E14 5AB London
>

Reply via email to