Manu, I think your critique about user interface clarity is valid.
CombineFn conflates a few operations and is not that clear about what it is
doing or why. You seem to be concerned about CombineFn versus
SerializableFunction constructors for the Combine family of transforms. I
thought I'd respond from my own perspective, in case it is helpful. It is
mostly the same things that Luke has said. Let's ignore keys. I don't think
they change things much.

As you seem to already understand, a CombineFn is a convenient collapsed
representation of three functions:

    init : InputT -> AccumT
    combiner: (AccumT, AccumT) -> AccumT
    extract: AccumT -> OutputT

And the real semantics:

    MapElements.via(init)
    Combine.via(combiner)
    MapElements.via(extract)

For starters, "associative" is not even a well-typed word to use unless
input type and output type are the same. So it is `combiner` that needs to
be associative and commutative. Sometimes `combiner` also has an identity
element. I'm afraid `createAccumulator()` and `defaultValue()` confuse
things here (the latter is never meaningfully used). When we say a
CombineFn has to be "associative" and "commutative" we just mean that it
can be factored into these methods.

So the SerializableFunction just needs to be factorable into these methods,
too, like Luke said. Pragmatically, if we only have a
SerializableFunction<Iterable<InputT>, OutputT> then we don't have a way to
do hierarchical combines (can't feed the output of one layer into the next
layer), so associativity is irrelevant and it might as well be a
MapElements. So it only makes sense to allow
SerializableFunction<Iterable<AccumT>, AccumT>. Some variant that is a
binary function would make sense for lambdas, etc.

Here are some reasons for the particular design of CombineFn that actually
should be called out:

 - It is a major efficiency gain to mutate the accumulator.
 - Usually `init` is trivial and best to inline, hence addInput(InputT,
AccumT)
 - With `compact` we allow multiple physical representations of the same
semantic accumulator, and a hook to switch between them
 - And it is hard to take the user through the journey from the real
reasons behind it and the particular Java interface

Note also that CombineWithContext allows side inputs, which complicates the
formalities somewhat but doesn't change the intuition.

Kenn

On Mon, Oct 31, 2016 at 6:37 PM Manu Zhang <owenzhang1...@gmail.com> wrote:

> I'm a bit confused here because neither of them requires same type of
> input and output. Also, the Javadoc of Globally says "It is common for {@code
> *InputT == OutputT}, but not required" *If associative and commutative is
> expected, why don't they have restrictions like
> Combine.perKey(SerializableFunction) ?
>
> I understand the motive and requirement behind Combine functions. I'm more
> asking about the user interface consistency.
> By the way, it's hard to know what Combine.Globally does from the name but
> that discussion should be put in another thread.
>
> Thanks for your patience here.
>
> Manu
>
> On Tue, Nov 1, 2016 at 12:04 AM Lukasz Cwik <lc...@google.com> wrote:
>
> GlobalCombineFn and PerKeyCombineFn still expect an associative and
> commutative function when accumulating.
> GlobalCombineFn is shorthand for assigning everything to a single key,
> doing the combine, and then discarding the key and extracting the single
> output.
> PerKeyCombineFn is shorthand for doing accumulation where the key doesn't
> modify the accumulation in anyway.
>
> On Fri, Oct 28, 2016 at 6:09 PM, Manu Zhang <owenzhang1...@gmail.com>
> wrote:
>
> Then what about the other interfaces, like Combine.perKey(GlobalCombineFn)
> and Combine.perKey(PerkeyCombineFn) ? Why not all of them have the
> requirements ?
>
> On Sat, Oct 29, 2016 at 12:06 AM Lukasz Cwik <lc...@google.com> wrote:
>
> For it to be considered a combiner, the function needs to be associative
> and commutative.
>
> The issue is that from an API perspective it would be easy to have a
> Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). But many
> people in the data processing world expect that this
> parallelization/optimization is performed and thus exposing such a method
> would be dangerous as it would be breaking users expectations so from the
> design perspective it is a hard requirement. If PCollections ever become
> ordered or gain other properties, these requirements may loosen but it
> seems unlikely in the short term.
>
> At this point, I think your looking for a MapElements which you pass in a
> SerializableFunction<KV<K, Iterable<InputT>, KV<K, OutputT>>.
> Creating a wrapper SerializableFunction<KV<K, Iterable<InputT>, KV<K,
> OutputT>> which can delegate to a SerializableFunction<Iterable<InputT>,
> OutputT> should be trivial.
>
>
> On Thu, Oct 27, 2016 at 6:17 PM, Manu Zhang <owenzhang1...@gmail.com>
> wrote:
>
> Thanks for the thorough explanation. I see the benefits for such a
> function.
> My follow-up question is whether this is a hard requirement.
> There are computations that don't satisfy this (I think it's monoid rule)
> but possible and easier to write with
> Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). It's not
> difficult to provide an underlying CombineFn.
>
>
> On Thu, Oct 27, 2016 at 9:47 AM Lukasz Cwik <lc...@google.com.invalid>
> wrote:
>
> Combine.perKey takes a single SerializableFunction which knows how to
> convert from Iterable<V> to V.
>
> It turns out that many runners implement optimizations which allow them to
> run the combine operation across several machines to parallelize the work
> and potentially reduce the amount of data they store during a GBK.
> To be able to do such an optimization, it requires you to actually have
> three functions:
> InputT -> AccumulatorT : Creates the intermediate representation which
> allows for associative combining
> Iterable<AccumulatorT> -> AccumulatorT: Performs the actual combining
> AccumT -> OutputT: Extracts the output
>
> In the case of Combine.perKey with a SerializableFunction, your providing
> Iterable<AccumulatorT> -> AccumulatorT and the other two functions are the
> identity functions.
>
> To be able to support a Combine.perKey which can go from Iterable<InputT>
> -> OutputT would require that this occurred within a single machine
> removing the parallelization benefits that runners provide and for almost
> all cases is not a good idea.
>
> On Wed, Oct 26, 2016 at 6:23 PM, Manu Zhang <owenzhang1...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > I'm wondering why `Combine.perKey(SerializableFunction)` requires input
> > and
> > output to be of the same type while `Combine.PerKey` doesn't have this
> > restriction.
> >
> > Thanks,
> > Manu
> >
>
>
>
>

Reply via email to