On Mon, Oct 31, 2016 at 8:39 PM, Kenneth Knowles <k...@google.com.invalid> 
wrote:
> Manu, I think your critique about user interface clarity is valid.
> CombineFn conflates a few operations and is not that clear about what it is
> doing or why. You seem to be concerned about CombineFn versus
> SerializableFunction constructors for the Combine family of transforms. I
> thought I'd respond from my own perspective, in case it is helpful. It is
> mostly the same things that Luke has said. Let's ignore keys. I don't think
> they change things much.
>
> As you seem to already understand, a CombineFn is a convenient collapsed
> representation of three functions:
>
>     init : InputT -> AccumT
>     combiner: (AccumT, AccumT) -> AccumT
>     extract: AccumT -> OutputT
>
> And the real semantics:
>
>     MapElements.via(init)
>     Combine.via(combiner)
>     MapElements.via(extract)
>
> For starters, "associative" is not even a well-typed word to use unless
> input type and output type are the same. So it is `combiner` that needs to
> be associative and commutative. Sometimes `combiner` also has an identity
> element. I'm afraid `createAccumulator()` and `defaultValue()` confuse
> things here (the latter is never meaningfully used). When we say a
> CombineFn has to be "associative" and "commutative" we just mean that it
> can be factored into these methods.
>
> So the SerializableFunction just needs to be factorable into these methods,
> too, like Luke said. Pragmatically, if we only have a
> SerializableFunction<Iterable<InputT>, OutputT> then we don't have a way to
> do hierarchical combines (can't feed the output of one layer into the next
> layer), so associativity is irrelevant and it might as well be a
> MapElements. So it only makes sense to allow
> SerializableFunction<Iterable<AccumT>, AccumT>. Some variant that is a
> binary function would make sense for lambdas, etc.
>
> Here are some reasons for the particular design of CombineFn that actually
> should be called out:
>
>  - It is a major efficiency gain to mutate the accumulator.
>  - Usually `init` is trivial and best to inline, hence addInput(InputT,
> AccumT)

I would add that often the map InputT -> AccumT is *non-trivial*, as
is AccumT -> AccumT, so AccumT + Input -> AccumT is preferable (both
for efficiency and code simplicity) for anything beyond trivial
combiners. FlumeJava, a predecessor to Beam that we took many lessons
from, had an explicit init rather than addInput and that turned out to
be a drawback when implementing CombineFns.

>  - With `compact` we allow multiple physical representations of the same
> semantic accumulator, and a hook to switch between them
>  - And it is hard to take the user through the journey from the real
> reasons behind it and the particular Java interface
>
> Note also that CombineWithContext allows side inputs, which complicates the
> formalities somewhat but doesn't change the intuition.
>
> Kenn
>
> On Mon, Oct 31, 2016 at 6:37 PM Manu Zhang <owenzhang1...@gmail.com> wrote:
>
>> I'm a bit confused here because neither of them requires same type of
>> input and output. Also, the Javadoc of Globally says "It is common for {@code
>> *InputT == OutputT}, but not required" *If associative and commutative is
>> expected, why don't they have restrictions like
>> Combine.perKey(SerializableFunction) ?
>>
>> I understand the motive and requirement behind Combine functions. I'm more
>> asking about the user interface consistency.
>> By the way, it's hard to know what Combine.Globally does from the name but
>> that discussion should be put in another thread.
>>
>> Thanks for your patience here.
>>
>> Manu
>>
>> On Tue, Nov 1, 2016 at 12:04 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>> GlobalCombineFn and PerKeyCombineFn still expect an associative and
>> commutative function when accumulating.
>> GlobalCombineFn is shorthand for assigning everything to a single key,
>> doing the combine, and then discarding the key and extracting the single
>> output.
>> PerKeyCombineFn is shorthand for doing accumulation where the key doesn't
>> modify the accumulation in anyway.
>>
>> On Fri, Oct 28, 2016 at 6:09 PM, Manu Zhang <owenzhang1...@gmail.com>
>> wrote:
>>
>> Then what about the other interfaces, like Combine.perKey(GlobalCombineFn)
>> and Combine.perKey(PerkeyCombineFn) ? Why not all of them have the
>> requirements ?
>>
>> On Sat, Oct 29, 2016 at 12:06 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>> For it to be considered a combiner, the function needs to be associative
>> and commutative.
>>
>> The issue is that from an API perspective it would be easy to have a
>> Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). But many
>> people in the data processing world expect that this
>> parallelization/optimization is performed and thus exposing such a method
>> would be dangerous as it would be breaking users expectations so from the
>> design perspective it is a hard requirement. If PCollections ever become
>> ordered or gain other properties, these requirements may loosen but it
>> seems unlikely in the short term.
>>
>> At this point, I think your looking for a MapElements which you pass in a
>> SerializableFunction<KV<K, Iterable<InputT>, KV<K, OutputT>>.
>> Creating a wrapper SerializableFunction<KV<K, Iterable<InputT>, KV<K,
>> OutputT>> which can delegate to a SerializableFunction<Iterable<InputT>,
>> OutputT> should be trivial.
>>
>>
>> On Thu, Oct 27, 2016 at 6:17 PM, Manu Zhang <owenzhang1...@gmail.com>
>> wrote:
>>
>> Thanks for the thorough explanation. I see the benefits for such a
>> function.
>> My follow-up question is whether this is a hard requirement.
>> There are computations that don't satisfy this (I think it's monoid rule)
>> but possible and easier to write with
>> Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). It's not
>> difficult to provide an underlying CombineFn.
>>
>>
>> On Thu, Oct 27, 2016 at 9:47 AM Lukasz Cwik <lc...@google.com.invalid>
>> wrote:
>>
>> Combine.perKey takes a single SerializableFunction which knows how to
>> convert from Iterable<V> to V.
>>
>> It turns out that many runners implement optimizations which allow them to
>> run the combine operation across several machines to parallelize the work
>> and potentially reduce the amount of data they store during a GBK.
>> To be able to do such an optimization, it requires you to actually have
>> three functions:
>> InputT -> AccumulatorT : Creates the intermediate representation which
>> allows for associative combining
>> Iterable<AccumulatorT> -> AccumulatorT: Performs the actual combining
>> AccumT -> OutputT: Extracts the output
>>
>> In the case of Combine.perKey with a SerializableFunction, your providing
>> Iterable<AccumulatorT> -> AccumulatorT and the other two functions are the
>> identity functions.
>>
>> To be able to support a Combine.perKey which can go from Iterable<InputT>
>> -> OutputT would require that this occurred within a single machine
>> removing the parallelization benefits that runners provide and for almost
>> all cases is not a good idea.
>>
>> On Wed, Oct 26, 2016 at 6:23 PM, Manu Zhang <owenzhang1...@gmail.com>
>> wrote:
>>
>> > Hi all,
>> >
>> > I'm wondering why `Combine.perKey(SerializableFunction)` requires input
>> > and
>> > output to be of the same type while `Combine.PerKey` doesn't have this
>> > restriction.
>> >
>> > Thanks,
>> > Manu
>> >
>>
>>
>>
>>

Reply via email to