One minor correction I wanted to note: when you use

    Combine.perKey(SerializableFunction<Iterable<V>, V>)

the SerializableFunction plays roughly the same role as mergeAccumulators
(and the Combine  is implemented just so) , so it requires that the input
type V is the accumulator type, as with aggregations like Sum. In fact,
this does allow a runner to combine prior to the shuffle.

On Nov 22, 2016 06:57, "Lukasz Cwik" <[email protected]> wrote:

> Is this ever actually true? "Arrays.asList(hits).contains(full_list[i])"
> Its unclear if full_list is empty or not, if its empty then your Accum.log
> is empty.
>
> When you were asking about the SerializableFunction, were you specifically
> referring to one which you pass into Combine such as Combine.perKey (
> http://beam.incubator.apache.org/documentation/sdks/
> javadoc/0.3.0-incubating/org/apache/beam/sdk/transforms/
> Combine.html#perKey-org.apache.beam.sdk.transforms.SerializableFunction-)?
>
> If so, then it is not true that the all the values for a single key need
> to be shipped to one worker. For example, if you had four values (A0, A1,
> A2, A3), one worker could combine A0 and A1 producing A01 and another
> worker could combine A2 and A3 producing A23 with a third worker combining
> A01 and A23 producing the final result A0123.
>
> Since your specifically trying to combine the same input as the output
> type, I would stick with the simpler SerializableFunction implementation as
> you get all the performance benefits with a much simpler implementation
> requirement.
>
>
> On Tue, Nov 22, 2016 at 9:37 AM, Matthias Baetens <
> [email protected]> wrote:
>
>> Hi Lukasz,
>> Hi all,
>>
>> Thanks for your message.
>> The code in my previous e-mail specifies the functions I have defined for
>> my accumulator, which is basically adding to an ArrayList and then merging
>> into one ArrayList. I think I have implemented all the functions necessary,
>> as specified in the docs. However, when writing the results, the ArrayLists
>> seem to be empty. When loggingm only the logs from the *addInputs() *function
>> get written.
>> Do you have any idea why this approach would not work for ArrayList and
>> how to solve it (or if there are any alternatives)?
>>
>> Cheers!
>>
>> Matthias
>>
>> On Tue, Nov 22, 2016 at 2:14 PM, Lukasz Cwik <[email protected]> wrote:
>>
>>> The javadoc goes through a lengthy explanation:
>>> http://beam.incubator.apache.org/documentation/sdks/javadoc/
>>> 0.3.0-incubating/org/apache/beam/sdk/transforms/Combine.CombineFn.html
>>>
>>> A CombineFn<InputT, AccumT, OutputT> specifies how to combine a
>>> collection of input values of type InputT into a single output value of
>>> type OutputT. It does this via one or more intermediate mutable accumulator
>>> values of type AccumT.
>>> The overall process to combine a collection of input InputT values into
>>> a single output OutputT value is as follows:
>>>
>>> The input InputT values are partitioned into one or more batches.
>>> 1) For each batch, the createAccumulator() operation is invoked to
>>> create a fresh mutable accumulator value of type AccumT, initialized to
>>> represent the combination of zero values.
>>> 2) For each input InputT value in a batch, the addInput(AccumT, InputT)
>>> operation is invoked to add the value to that batch's accumulator AccumT
>>> value. The accumulator may just record the new value (e.g., if AccumT ==
>>> List<InputT>, or may do work to represent the combination more compactly.
>>> 3) The mergeAccumulators(java.lang.Iterable<AccumT>) operation is
>>> invoked to combine a collection of accumulator AccumT values into a single
>>> combined output accumulator AccumT value, once the merging accumulators
>>> have had all all the input values in their batches added to them. This
>>> operation is invoked repeatedly, until there is only one accumulator value
>>> left.
>>> 4) The extractOutput(AccumT) operation is invoked on the final
>>> accumulator AccumT value to get the output OutputT value.
>>> For example:
>>>
>>>
>>>  public class AverageFn extends CombineFn<Integer, AverageFn.Accum,
>>> Double> {
>>>    public static class Accum {
>>>      int sum = 0;
>>>      int count = 0;
>>>    }
>>>    public Accum createAccumulator() {
>>>      return new Accum();
>>>    }
>>>    public Accum addInput(Accum accum, Integer input) {
>>>        accum.sum += input;
>>>        accum.count++;
>>>        return accum;
>>>    }
>>>    public Accum mergeAccumulators(Iterable<Accum> accums) {
>>>      Accum merged = createAccumulator();
>>>      for (Accum accum : accums) {
>>>        merged.sum += accum.sum;
>>>        merged.count += accum.count;
>>>      }
>>>      return merged;
>>>    }
>>>    public Double extractOutput(Accum accum) {
>>>      return ((double) accum.sum) / accum.count;
>>>    }
>>>  }
>>>  PCollection<Integer> pc = ...;
>>>  PCollection<Double> average = pc.apply(Combine.globally(new
>>> AverageFn()));
>>>
>>> Combining functions used by Combine.Globally, Combine.PerKey,
>>> Combine.GroupedValues, and PTransforms derived from them should be
>>> associative and commutative. Associativity is required because input values
>>> are first broken up into subgroups before being combined, and their
>>> intermediate results further combined, in an arbitrary tree structure.
>>> Commutativity is required because any order of the input values is ignored
>>> when breaking up input values into groups.
>>>
>>> On Tue, Nov 22, 2016 at 7:11 AM, Matthias Baetens <
>>> [email protected]> wrote:
>>>
>>>> Hi Aljoscha,
>>>>
>>>> Thank you very much for the clear answer, it solves my problem (since I
>>>> now know I should use a SerializableFunction when wanting to have a full
>>>> view of the data for one key).
>>>>
>>>> For the second question, the accumulator that I wrote appends elements
>>>> to the ArrayList of a custom object in case it's not in the list yet:
>>>>
>>>> @DefaultCoder(AvroCoder.class)
>>>> public static class Accum {
>>>> Log log = null;
>>>> }
>>>> @Override
>>>> public Accum createAccumulator() { return new Accum(); }
>>>> @Override
>>>> public Accum addInput(Accum accum, Log input) {
>>>> if(accum.log == null) {
>>>> accum.log = new Log(input);
>>>> }
>>>> String[] hits = input.getHits_appInfo_screenName().split("/");
>>>> for(int i = 0; i < full_list.length; i++) {
>>>> if(Arrays.asList(hits).contains(full_list[i]) &&
>>>> !accum.log.getSparseArray().contains(i)) accum.log.getSparseArray().add
>>>> (i);
>>>> }
>>>> return accum;
>>>> }
>>>> @Override
>>>> public Accum mergeAccumulators(Iterable<Accum> accums) {
>>>> Accum merged = createAccumulator();
>>>> for(Accum accum : accums) {
>>>> if(merged.log == null){
>>>> merged.log = new Log(accum.log);
>>>> }
>>>> else {
>>>> for(Integer index : accum.log.getSparseArray()) {
>>>> if(!merged.log.getSparseArray().contains(index))
>>>> merged.log.getSparseArray().add(index);
>>>> }
>>>> }
>>>> }
>>>> return merged;
>>>> }
>>>> @Override
>>>> public Log extractOutput(Accum accum) {
>>>> return accum.log;
>>>> }
>>>>
>>>> Hope this helps for answering the second question!
>>>>
>>>> Thanks again :)
>>>>
>>>> Best,
>>>>
>>>> Matthias
>>>>
>>>> On Tue, Nov 22, 2016 at 11:47 AM, Aljoscha Krettek <[email protected]
>>>> > wrote:
>>>>
>>>>> Hi Matthias,
>>>>> when the shuffle happens is not defined by the Beam model so it
>>>>> depends on the runner. You are right, though, that a runner can optimise
>>>>> execution when you specify a CombineFn. In that case a runner can choose 
>>>>> to
>>>>> combine elements before shuffling to reduce the amount of data that we 
>>>>> have
>>>>> to shuffle across the network. With a SerializableFunction that's not
>>>>> possible because we don't have an intermediate accumulation type as we 
>>>>> have
>>>>> with a CombineFn. Therefore, the runner has to ship all elements for a
>>>>> given key to one machine to apply the SerializableFunction.
>>>>>
>>>>> Regarding your second question, could you maybe send a code snipped?
>>>>> That would allow us to have a look and give a good answer.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Tue, 22 Nov 2016 at 12:32 Matthias Baetens <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi there,
>>>>>>
>>>>>> I had some questions about the internal working of these two concepts
>>>>>> and where I could find more info on this (so I might be able similar
>>>>>> problems in the future myself. Here we go:
>>>>>>
>>>>>> + When doing a GroupByKey, when does the shuffling actually take
>>>>>> place? Could it be the behaviour is not the same when using a CombineFn 
>>>>>> to
>>>>>> aggregate compared to when using a Serializablefunction? (I have a 
>>>>>> feeling
>>>>>> in the first case not all the keys get shuffled to one machine, while it
>>>>>> does for the second).
>>>>>>
>>>>>> + When using Accumulators in a CombineFn, what are the actual
>>>>>> internals? Is there any docs on this? The problem I run into is that, 
>>>>>> when
>>>>>> I try adding elements to an ArrayList and then merge ArrayList, the 
>>>>>> output
>>>>>> is an empty list. The problem could probably be solved by using a
>>>>>> Serializablefunction to Combine everything at once, but you might loose 
>>>>>> the
>>>>>> advantages of parallellisation in that case (~ above).
>>>>>>
>>>>>> Thanks a lot :)
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Matthias
>>>>>> --
>>>>>>
>>>>>> *Matthias Baetens*
>>>>>>
>>>>>>
>>>>>> *datatonic | data power unleashed*
>>>>>> office +44 203 668 3680 <+44%2020%203668%203680>  |  mobile +44 74
>>>>>> 918 20646
>>>>>>
>>>>>> Level24 | 1 Canada Square | Canary Wharf | E14 5AB London
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> *Matthias Baetens*
>>>>
>>>>
>>>> *datatonic | data power unleashed*
>>>> office +44 203 668 3680 <+44%2020%203668%203680>  |  mobile +44 74 918
>>>> 20646
>>>>
>>>> Level24 | 1 Canada Square | Canary Wharf | E14 5AB London
>>>>
>>>
>>>
>>
>>
>> --
>>
>> *Matthias Baetens*
>>
>>
>> *datatonic | data power unleashed*
>> office +44 203 668 3680 <+44%2020%203668%203680>  |  mobile +44 74 918
>> 20646
>>
>> Level24 | 1 Canada Square | Canary Wharf | E14 5AB London
>>
>
>

Reply via email to