Is this ever actually true? "Arrays.asList(hits).contains(full_list[i])"
Its unclear if full_list is empty or not, if its empty then your Accum.log
is empty.

When you were asking about the SerializableFunction, were you specifically
referring to one which you pass into Combine such as Combine.perKey (
http://beam.incubator.apache.org/documentation/sdks/javadoc/0.3.0-incubating/org/apache/beam/sdk/transforms/Combine.html#perKey-org.apache.beam.sdk.transforms.SerializableFunction-
)?

If so, then it is not true that the all the values for a single key need to
be shipped to one worker. For example, if you had four values (A0, A1, A2,
A3), one worker could combine A0 and A1 producing A01 and another worker
could combine A2 and A3 producing A23 with a third worker combining A01 and
A23 producing the final result A0123.

Since your specifically trying to combine the same input as the output
type, I would stick with the simpler SerializableFunction implementation as
you get all the performance benefits with a much simpler implementation
requirement.


On Tue, Nov 22, 2016 at 9:37 AM, Matthias Baetens <
[email protected]> wrote:

> Hi Lukasz,
> Hi all,
>
> Thanks for your message.
> The code in my previous e-mail specifies the functions I have defined for
> my accumulator, which is basically adding to an ArrayList and then merging
> into one ArrayList. I think I have implemented all the functions necessary,
> as specified in the docs. However, when writing the results, the ArrayLists
> seem to be empty. When loggingm only the logs from the *addInputs() *function
> get written.
> Do you have any idea why this approach would not work for ArrayList and
> how to solve it (or if there are any alternatives)?
>
> Cheers!
>
> Matthias
>
> On Tue, Nov 22, 2016 at 2:14 PM, Lukasz Cwik <[email protected]> wrote:
>
>> The javadoc goes through a lengthy explanation:
>> http://beam.incubator.apache.org/documentation/sdks/javadoc/
>> 0.3.0-incubating/org/apache/beam/sdk/transforms/Combine.CombineFn.html
>>
>> A CombineFn<InputT, AccumT, OutputT> specifies how to combine a
>> collection of input values of type InputT into a single output value of
>> type OutputT. It does this via one or more intermediate mutable accumulator
>> values of type AccumT.
>> The overall process to combine a collection of input InputT values into a
>> single output OutputT value is as follows:
>>
>> The input InputT values are partitioned into one or more batches.
>> 1) For each batch, the createAccumulator() operation is invoked to create
>> a fresh mutable accumulator value of type AccumT, initialized to represent
>> the combination of zero values.
>> 2) For each input InputT value in a batch, the addInput(AccumT, InputT)
>> operation is invoked to add the value to that batch's accumulator AccumT
>> value. The accumulator may just record the new value (e.g., if AccumT ==
>> List<InputT>, or may do work to represent the combination more compactly.
>> 3) The mergeAccumulators(java.lang.Iterable<AccumT>) operation is
>> invoked to combine a collection of accumulator AccumT values into a single
>> combined output accumulator AccumT value, once the merging accumulators
>> have had all all the input values in their batches added to them. This
>> operation is invoked repeatedly, until there is only one accumulator value
>> left.
>> 4) The extractOutput(AccumT) operation is invoked on the final
>> accumulator AccumT value to get the output OutputT value.
>> For example:
>>
>>
>>  public class AverageFn extends CombineFn<Integer, AverageFn.Accum,
>> Double> {
>>    public static class Accum {
>>      int sum = 0;
>>      int count = 0;
>>    }
>>    public Accum createAccumulator() {
>>      return new Accum();
>>    }
>>    public Accum addInput(Accum accum, Integer input) {
>>        accum.sum += input;
>>        accum.count++;
>>        return accum;
>>    }
>>    public Accum mergeAccumulators(Iterable<Accum> accums) {
>>      Accum merged = createAccumulator();
>>      for (Accum accum : accums) {
>>        merged.sum += accum.sum;
>>        merged.count += accum.count;
>>      }
>>      return merged;
>>    }
>>    public Double extractOutput(Accum accum) {
>>      return ((double) accum.sum) / accum.count;
>>    }
>>  }
>>  PCollection<Integer> pc = ...;
>>  PCollection<Double> average = pc.apply(Combine.globally(new
>> AverageFn()));
>>
>> Combining functions used by Combine.Globally, Combine.PerKey,
>> Combine.GroupedValues, and PTransforms derived from them should be
>> associative and commutative. Associativity is required because input values
>> are first broken up into subgroups before being combined, and their
>> intermediate results further combined, in an arbitrary tree structure.
>> Commutativity is required because any order of the input values is ignored
>> when breaking up input values into groups.
>>
>> On Tue, Nov 22, 2016 at 7:11 AM, Matthias Baetens <
>> [email protected]> wrote:
>>
>>> Hi Aljoscha,
>>>
>>> Thank you very much for the clear answer, it solves my problem (since I
>>> now know I should use a SerializableFunction when wanting to have a full
>>> view of the data for one key).
>>>
>>> For the second question, the accumulator that I wrote appends elements
>>> to the ArrayList of a custom object in case it's not in the list yet:
>>>
>>> @DefaultCoder(AvroCoder.class)
>>> public static class Accum {
>>> Log log = null;
>>> }
>>> @Override
>>> public Accum createAccumulator() { return new Accum(); }
>>> @Override
>>> public Accum addInput(Accum accum, Log input) {
>>> if(accum.log == null) {
>>> accum.log = new Log(input);
>>> }
>>> String[] hits = input.getHits_appInfo_screenName().split("/");
>>> for(int i = 0; i < full_list.length; i++) {
>>> if(Arrays.asList(hits).contains(full_list[i]) &&
>>> !accum.log.getSparseArray().contains(i)) accum.log.getSparseArray().add
>>> (i);
>>> }
>>> return accum;
>>> }
>>> @Override
>>> public Accum mergeAccumulators(Iterable<Accum> accums) {
>>> Accum merged = createAccumulator();
>>> for(Accum accum : accums) {
>>> if(merged.log == null){
>>> merged.log = new Log(accum.log);
>>> }
>>> else {
>>> for(Integer index : accum.log.getSparseArray()) {
>>> if(!merged.log.getSparseArray().contains(index))
>>> merged.log.getSparseArray().add(index);
>>> }
>>> }
>>> }
>>> return merged;
>>> }
>>> @Override
>>> public Log extractOutput(Accum accum) {
>>> return accum.log;
>>> }
>>>
>>> Hope this helps for answering the second question!
>>>
>>> Thanks again :)
>>>
>>> Best,
>>>
>>> Matthias
>>>
>>> On Tue, Nov 22, 2016 at 11:47 AM, Aljoscha Krettek <[email protected]>
>>> wrote:
>>>
>>>> Hi Matthias,
>>>> when the shuffle happens is not defined by the Beam model so it depends
>>>> on the runner. You are right, though, that a runner can optimise execution
>>>> when you specify a CombineFn. In that case a runner can choose to combine
>>>> elements before shuffling to reduce the amount of data that we have to
>>>> shuffle across the network. With a SerializableFunction that's not possible
>>>> because we don't have an intermediate accumulation type as we have with a
>>>> CombineFn. Therefore, the runner has to ship all elements for a given key
>>>> to one machine to apply the SerializableFunction.
>>>>
>>>> Regarding your second question, could you maybe send a code snipped?
>>>> That would allow us to have a look and give a good answer.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Tue, 22 Nov 2016 at 12:32 Matthias Baetens <
>>>> [email protected]> wrote:
>>>>
>>>>> Hi there,
>>>>>
>>>>> I had some questions about the internal working of these two concepts
>>>>> and where I could find more info on this (so I might be able similar
>>>>> problems in the future myself. Here we go:
>>>>>
>>>>> + When doing a GroupByKey, when does the shuffling actually take
>>>>> place? Could it be the behaviour is not the same when using a CombineFn to
>>>>> aggregate compared to when using a Serializablefunction? (I have a feeling
>>>>> in the first case not all the keys get shuffled to one machine, while it
>>>>> does for the second).
>>>>>
>>>>> + When using Accumulators in a CombineFn, what are the actual
>>>>> internals? Is there any docs on this? The problem I run into is that, when
>>>>> I try adding elements to an ArrayList and then merge ArrayList, the output
>>>>> is an empty list. The problem could probably be solved by using a
>>>>> Serializablefunction to Combine everything at once, but you might loose 
>>>>> the
>>>>> advantages of parallellisation in that case (~ above).
>>>>>
>>>>> Thanks a lot :)
>>>>>
>>>>> Best,
>>>>>
>>>>> Matthias
>>>>> --
>>>>>
>>>>> *Matthias Baetens*
>>>>>
>>>>>
>>>>> *datatonic | data power unleashed*
>>>>> office +44 203 668 3680 <+44%2020%203668%203680>  |  mobile +44 74
>>>>> 918 20646
>>>>>
>>>>> Level24 | 1 Canada Square | Canary Wharf | E14 5AB London
>>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> *Matthias Baetens*
>>>
>>>
>>> *datatonic | data power unleashed*
>>> office +44 203 668 3680 <+44%2020%203668%203680>  |  mobile +44 74 918
>>> 20646
>>>
>>> Level24 | 1 Canada Square | Canary Wharf | E14 5AB London
>>>
>>
>>
>
>
> --
>
> *Matthias Baetens*
>
>
> *datatonic | data power unleashed*
> office +44 203 668 3680 <+44%2020%203668%203680>  |  mobile +44 74 918
> 20646
>
> Level24 | 1 Canada Square | Canary Wharf | E14 5AB London
>

Reply via email to