Re: Why does `Combine.perKey(SerializableFunction)` require same input and output type
Thanks guys. My original confusion comes from that if the API allows me to have different types of input and output, why not make it easier. It's clear now. Do you think it's better to hide some interfaces we don't expect users to use ? The Combine API has lured me a lot to do more than it expects ;) On Tue, Nov 1, 2016 at 12:03 PM Robert Bradshawwrote: > On Mon, Oct 31, 2016 at 8:39 PM, Kenneth Knowles > wrote: > > Manu, I think your critique about user interface clarity is valid. > > CombineFn conflates a few operations and is not that clear about what it > is > > doing or why. You seem to be concerned about CombineFn versus > > SerializableFunction constructors for the Combine family of transforms. I > > thought I'd respond from my own perspective, in case it is helpful. It is > > mostly the same things that Luke has said. Let's ignore keys. I don't > think > > they change things much. > > > > As you seem to already understand, a CombineFn is a convenient collapsed > > representation of three functions: > > > > init : InputT -> AccumT > > combiner: (AccumT, AccumT) -> AccumT > > extract: AccumT -> OutputT > > > > And the real semantics: > > > > MapElements.via(init) > > Combine.via(combiner) > > MapElements.via(extract) > > > > For starters, "associative" is not even a well-typed word to use unless > > input type and output type are the same. So it is `combiner` that needs > to > > be associative and commutative. Sometimes `combiner` also has an identity > > element. I'm afraid `createAccumulator()` and `defaultValue()` confuse > > things here (the latter is never meaningfully used). When we say a > > CombineFn has to be "associative" and "commutative" we just mean that it > > can be factored into these methods. > > > > So the SerializableFunction just needs to be factorable into these > methods, > > too, like Luke said. Pragmatically, if we only have a > > SerializableFunction then we don't have a way > to > > do hierarchical combines (can't feed the output of one layer into the > next > > layer), so associativity is irrelevant and it might as well be a > > MapElements. So it only makes sense to allow > > SerializableFunction . Some variant that is a > > binary function would make sense for lambdas, etc. > > > > Here are some reasons for the particular design of CombineFn that > actually > > should be called out: > > > > - It is a major efficiency gain to mutate the accumulator. > > - Usually `init` is trivial and best to inline, hence addInput(InputT, > > AccumT) > > I would add that often the map InputT -> AccumT is *non-trivial*, as > is AccumT -> AccumT, so AccumT + Input -> AccumT is preferable (both > for efficiency and code simplicity) for anything beyond trivial > combiners. FlumeJava, a predecessor to Beam that we took many lessons > from, had an explicit init rather than addInput and that turned out to > be a drawback when implementing CombineFns. > > > - With `compact` we allow multiple physical representations of the same > > semantic accumulator, and a hook to switch between them > > - And it is hard to take the user through the journey from the real > > reasons behind it and the particular Java interface > > > > Note also that CombineWithContext allows side inputs, which complicates > the > > formalities somewhat but doesn't change the intuition. > > > > Kenn > > > > On Mon, Oct 31, 2016 at 6:37 PM Manu Zhang > wrote: > > > >> I'm a bit confused here because neither of them requires same type of > >> input and output. Also, the Javadoc of Globally says "It is common for > {@code > >> *InputT == OutputT}, but not required" *If associative and commutative > is > >> expected, why don't they have restrictions like > >> Combine.perKey(SerializableFunction) ? > >> > >> I understand the motive and requirement behind Combine functions. I'm > more > >> asking about the user interface consistency. > >> By the way, it's hard to know what Combine.Globally does from the name > but > >> that discussion should be put in another thread. > >> > >> Thanks for your patience here. > >> > >> Manu > >> > >> On Tue, Nov 1, 2016 at 12:04 AM Lukasz Cwik wrote: > >> > >> GlobalCombineFn and PerKeyCombineFn still expect an associative and > >> commutative function when accumulating. > >> GlobalCombineFn is shorthand for assigning everything to a single key, > >> doing the combine, and then discarding the key and extracting the single > >> output. > >> PerKeyCombineFn is shorthand for doing accumulation where the key > doesn't > >> modify the accumulation in anyway. > >> > >> On Fri, Oct 28, 2016 at 6:09 PM, Manu Zhang > >> wrote: > >> > >> Then what about the other interfaces, like > Combine.perKey(GlobalCombineFn) > >> and Combine.perKey(PerkeyCombineFn) ? Why not all of them have the > >> requirements ? > >> > >> On
Re: Why does `Combine.perKey(SerializableFunction)` require same input and output type
On Mon, Oct 31, 2016 at 8:39 PM, Kenneth Knowleswrote: > Manu, I think your critique about user interface clarity is valid. > CombineFn conflates a few operations and is not that clear about what it is > doing or why. You seem to be concerned about CombineFn versus > SerializableFunction constructors for the Combine family of transforms. I > thought I'd respond from my own perspective, in case it is helpful. It is > mostly the same things that Luke has said. Let's ignore keys. I don't think > they change things much. > > As you seem to already understand, a CombineFn is a convenient collapsed > representation of three functions: > > init : InputT -> AccumT > combiner: (AccumT, AccumT) -> AccumT > extract: AccumT -> OutputT > > And the real semantics: > > MapElements.via(init) > Combine.via(combiner) > MapElements.via(extract) > > For starters, "associative" is not even a well-typed word to use unless > input type and output type are the same. So it is `combiner` that needs to > be associative and commutative. Sometimes `combiner` also has an identity > element. I'm afraid `createAccumulator()` and `defaultValue()` confuse > things here (the latter is never meaningfully used). When we say a > CombineFn has to be "associative" and "commutative" we just mean that it > can be factored into these methods. > > So the SerializableFunction just needs to be factorable into these methods, > too, like Luke said. Pragmatically, if we only have a > SerializableFunction then we don't have a way to > do hierarchical combines (can't feed the output of one layer into the next > layer), so associativity is irrelevant and it might as well be a > MapElements. So it only makes sense to allow > SerializableFunction . Some variant that is a > binary function would make sense for lambdas, etc. > > Here are some reasons for the particular design of CombineFn that actually > should be called out: > > - It is a major efficiency gain to mutate the accumulator. > - Usually `init` is trivial and best to inline, hence addInput(InputT, > AccumT) I would add that often the map InputT -> AccumT is *non-trivial*, as is AccumT -> AccumT, so AccumT + Input -> AccumT is preferable (both for efficiency and code simplicity) for anything beyond trivial combiners. FlumeJava, a predecessor to Beam that we took many lessons from, had an explicit init rather than addInput and that turned out to be a drawback when implementing CombineFns. > - With `compact` we allow multiple physical representations of the same > semantic accumulator, and a hook to switch between them > - And it is hard to take the user through the journey from the real > reasons behind it and the particular Java interface > > Note also that CombineWithContext allows side inputs, which complicates the > formalities somewhat but doesn't change the intuition. > > Kenn > > On Mon, Oct 31, 2016 at 6:37 PM Manu Zhang wrote: > >> I'm a bit confused here because neither of them requires same type of >> input and output. Also, the Javadoc of Globally says "It is common for {@code >> *InputT == OutputT}, but not required" *If associative and commutative is >> expected, why don't they have restrictions like >> Combine.perKey(SerializableFunction) ? >> >> I understand the motive and requirement behind Combine functions. I'm more >> asking about the user interface consistency. >> By the way, it's hard to know what Combine.Globally does from the name but >> that discussion should be put in another thread. >> >> Thanks for your patience here. >> >> Manu >> >> On Tue, Nov 1, 2016 at 12:04 AM Lukasz Cwik wrote: >> >> GlobalCombineFn and PerKeyCombineFn still expect an associative and >> commutative function when accumulating. >> GlobalCombineFn is shorthand for assigning everything to a single key, >> doing the combine, and then discarding the key and extracting the single >> output. >> PerKeyCombineFn is shorthand for doing accumulation where the key doesn't >> modify the accumulation in anyway. >> >> On Fri, Oct 28, 2016 at 6:09 PM, Manu Zhang >> wrote: >> >> Then what about the other interfaces, like Combine.perKey(GlobalCombineFn) >> and Combine.perKey(PerkeyCombineFn) ? Why not all of them have the >> requirements ? >> >> On Sat, Oct 29, 2016 at 12:06 AM Lukasz Cwik wrote: >> >> For it to be considered a combiner, the function needs to be associative >> and commutative. >> >> The issue is that from an API perspective it would be easy to have a >> Combine.perKey(SerializableFunction ). But many >> people in the data processing world expect that this >> parallelization/optimization is performed and thus exposing such a method >> would be dangerous as it would be breaking users expectations so from the >> design perspective it is a hard requirement. If PCollections ever become >> ordered or gain
Re: Why does `Combine.perKey(SerializableFunction)` require same input and output type
Manu, I think your critique about user interface clarity is valid. CombineFn conflates a few operations and is not that clear about what it is doing or why. You seem to be concerned about CombineFn versus SerializableFunction constructors for the Combine family of transforms. I thought I'd respond from my own perspective, in case it is helpful. It is mostly the same things that Luke has said. Let's ignore keys. I don't think they change things much. As you seem to already understand, a CombineFn is a convenient collapsed representation of three functions: init : InputT -> AccumT combiner: (AccumT, AccumT) -> AccumT extract: AccumT -> OutputT And the real semantics: MapElements.via(init) Combine.via(combiner) MapElements.via(extract) For starters, "associative" is not even a well-typed word to use unless input type and output type are the same. So it is `combiner` that needs to be associative and commutative. Sometimes `combiner` also has an identity element. I'm afraid `createAccumulator()` and `defaultValue()` confuse things here (the latter is never meaningfully used). When we say a CombineFn has to be "associative" and "commutative" we just mean that it can be factored into these methods. So the SerializableFunction just needs to be factorable into these methods, too, like Luke said. Pragmatically, if we only have a SerializableFunctionthen we don't have a way to do hierarchical combines (can't feed the output of one layer into the next layer), so associativity is irrelevant and it might as well be a MapElements. So it only makes sense to allow SerializableFunction . Some variant that is a binary function would make sense for lambdas, etc. Here are some reasons for the particular design of CombineFn that actually should be called out: - It is a major efficiency gain to mutate the accumulator. - Usually `init` is trivial and best to inline, hence addInput(InputT, AccumT) - With `compact` we allow multiple physical representations of the same semantic accumulator, and a hook to switch between them - And it is hard to take the user through the journey from the real reasons behind it and the particular Java interface Note also that CombineWithContext allows side inputs, which complicates the formalities somewhat but doesn't change the intuition. Kenn On Mon, Oct 31, 2016 at 6:37 PM Manu Zhang wrote: > I'm a bit confused here because neither of them requires same type of > input and output. Also, the Javadoc of Globally says "It is common for {@code > *InputT == OutputT}, but not required" *If associative and commutative is > expected, why don't they have restrictions like > Combine.perKey(SerializableFunction) ? > > I understand the motive and requirement behind Combine functions. I'm more > asking about the user interface consistency. > By the way, it's hard to know what Combine.Globally does from the name but > that discussion should be put in another thread. > > Thanks for your patience here. > > Manu > > On Tue, Nov 1, 2016 at 12:04 AM Lukasz Cwik wrote: > > GlobalCombineFn and PerKeyCombineFn still expect an associative and > commutative function when accumulating. > GlobalCombineFn is shorthand for assigning everything to a single key, > doing the combine, and then discarding the key and extracting the single > output. > PerKeyCombineFn is shorthand for doing accumulation where the key doesn't > modify the accumulation in anyway. > > On Fri, Oct 28, 2016 at 6:09 PM, Manu Zhang > wrote: > > Then what about the other interfaces, like Combine.perKey(GlobalCombineFn) > and Combine.perKey(PerkeyCombineFn) ? Why not all of them have the > requirements ? > > On Sat, Oct 29, 2016 at 12:06 AM Lukasz Cwik wrote: > > For it to be considered a combiner, the function needs to be associative > and commutative. > > The issue is that from an API perspective it would be easy to have a > Combine.perKey(SerializableFunction ). But many > people in the data processing world expect that this > parallelization/optimization is performed and thus exposing such a method > would be dangerous as it would be breaking users expectations so from the > design perspective it is a hard requirement. If PCollections ever become > ordered or gain other properties, these requirements may loosen but it > seems unlikely in the short term. > > At this point, I think your looking for a MapElements which you pass in a > SerializableFunction >. > Creating a wrapper SerializableFunction OutputT>> which can delegate to a SerializableFunction OutputT> should be trivial. > > > On Thu, Oct 27, 2016 at 6:17 PM, Manu Zhang > wrote: > > Thanks for the thorough explanation. I see the benefits for such a > function. > My follow-up question is whether this is a hard requirement. >
Re: Why does `Combine.perKey(SerializableFunction)` require same input and output type
I'm a bit confused here because neither of them requires same type of input and output. Also, the Javadoc of Globally says "It is common for {@code *InputT == OutputT}, but not required" *If associative and commutative is expected, why don't they have restrictions like Combine.perKey(SerializableFunction) ? I understand the motive and requirement behind Combine functions. I'm more asking about the user interface consistency. By the way, it's hard to know what Combine.Globally does from the name but that discussion should be put in another thread. Thanks for your patience here. Manu On Tue, Nov 1, 2016 at 12:04 AM Lukasz Cwikwrote: > GlobalCombineFn and PerKeyCombineFn still expect an associative and > commutative function when accumulating. > GlobalCombineFn is shorthand for assigning everything to a single key, > doing the combine, and then discarding the key and extracting the single > output. > PerKeyCombineFn is shorthand for doing accumulation where the key doesn't > modify the accumulation in anyway. > > On Fri, Oct 28, 2016 at 6:09 PM, Manu Zhang > wrote: > > Then what about the other interfaces, like Combine.perKey(GlobalCombineFn) > and Combine.perKey(PerkeyCombineFn) ? Why not all of them have the > requirements ? > > On Sat, Oct 29, 2016 at 12:06 AM Lukasz Cwik wrote: > > For it to be considered a combiner, the function needs to be associative > and commutative. > > The issue is that from an API perspective it would be easy to have a > Combine.perKey(SerializableFunction ). But many > people in the data processing world expect that this > parallelization/optimization is performed and thus exposing such a method > would be dangerous as it would be breaking users expectations so from the > design perspective it is a hard requirement. If PCollections ever become > ordered or gain other properties, these requirements may loosen but it > seems unlikely in the short term. > > At this point, I think your looking for a MapElements which you pass in a > SerializableFunction >. > Creating a wrapper SerializableFunction OutputT>> which can delegate to a SerializableFunction OutputT> should be trivial. > > > On Thu, Oct 27, 2016 at 6:17 PM, Manu Zhang > wrote: > > Thanks for the thorough explanation. I see the benefits for such a > function. > My follow-up question is whether this is a hard requirement. > There are computations that don't satisfy this (I think it's monoid rule) > but possible and easier to write with > Combine.perKey(SerializableFunction ). It's not > difficult to provide an underlying CombineFn. > > > On Thu, Oct 27, 2016 at 9:47 AM Lukasz Cwik > wrote: > > Combine.perKey takes a single SerializableFunction which knows how to > convert from Iterable to V. > > It turns out that many runners implement optimizations which allow them to > run the combine operation across several machines to parallelize the work > and potentially reduce the amount of data they store during a GBK. > To be able to do such an optimization, it requires you to actually have > three functions: > InputT -> AccumulatorT : Creates the intermediate representation which > allows for associative combining > Iterable -> AccumulatorT: Performs the actual combining > AccumT -> OutputT: Extracts the output > > In the case of Combine.perKey with a SerializableFunction, your providing > Iterable -> AccumulatorT and the other two functions are the > identity functions. > > To be able to support a Combine.perKey which can go from Iterable > -> OutputT would require that this occurred within a single machine > removing the parallelization benefits that runners provide and for almost > all cases is not a good idea. > > On Wed, Oct 26, 2016 at 6:23 PM, Manu Zhang > wrote: > > > Hi all, > > > > I'm wondering why `Combine.perKey(SerializableFunction)` requires input > > and > > output to be of the same type while `Combine.PerKey` doesn't have this > > restriction. > > > > Thanks, > > Manu > > > > > >
Re: Why does `Combine.perKey(SerializableFunction)` require same input and output type
GlobalCombineFn and PerKeyCombineFn still expect an associative and commutative function when accumulating. GlobalCombineFn is shorthand for assigning everything to a single key, doing the combine, and then discarding the key and extracting the single output. PerKeyCombineFn is shorthand for doing accumulation where the key doesn't modify the accumulation in anyway. On Fri, Oct 28, 2016 at 6:09 PM, Manu Zhangwrote: > Then what about the other interfaces, like Combine.perKey(GlobalCombineFn) > and Combine.perKey(PerkeyCombineFn) ? Why not all of them have the > requirements ? > > On Sat, Oct 29, 2016 at 12:06 AM Lukasz Cwik wrote: > >> For it to be considered a combiner, the function needs to be associative >> and commutative. >> >> The issue is that from an API perspective it would be easy to have a >> Combine.perKey(SerializableFunction ). But >> many people in the data processing world expect that this >> parallelization/optimization is performed and thus exposing such a method >> would be dangerous as it would be breaking users expectations so from the >> design perspective it is a hard requirement. If PCollections ever become >> ordered or gain other properties, these requirements may loosen but it >> seems unlikely in the short term. >> >> At this point, I think your looking for a MapElements which you pass in a >> SerializableFunction >. >> Creating a wrapper SerializableFunction > OutputT>> which can delegate to a SerializableFunction > OutputT> should be trivial. >> >> >> On Thu, Oct 27, 2016 at 6:17 PM, Manu Zhang >> wrote: >> >> Thanks for the thorough explanation. I see the benefits for such a >> function. >> My follow-up question is whether this is a hard requirement. >> There are computations that don't satisfy this (I think it's monoid rule) >> but possible and easier to write with Combine.perKey( >> SerializableFunction ). It's not difficult to >> provide an underlying CombineFn. >> >> >> On Thu, Oct 27, 2016 at 9:47 AM Lukasz Cwik >> wrote: >> >> Combine.perKey takes a single SerializableFunction which knows how to >> convert from Iterable to V. >> >> It turns out that many runners implement optimizations which allow them to >> run the combine operation across several machines to parallelize the work >> and potentially reduce the amount of data they store during a GBK. >> To be able to do such an optimization, it requires you to actually have >> three functions: >> InputT -> AccumulatorT : Creates the intermediate representation which >> allows for associative combining >> Iterable -> AccumulatorT: Performs the actual combining >> AccumT -> OutputT: Extracts the output >> >> In the case of Combine.perKey with a SerializableFunction, your providing >> Iterable -> AccumulatorT and the other two functions are the >> identity functions. >> >> To be able to support a Combine.perKey which can go from Iterable >> -> OutputT would require that this occurred within a single machine >> removing the parallelization benefits that runners provide and for almost >> all cases is not a good idea. >> >> On Wed, Oct 26, 2016 at 6:23 PM, Manu Zhang >> wrote: >> >> > Hi all, >> > >> > I'm wondering why `Combine.perKey(SerializableFunction)` requires input >> > and >> > output to be of the same type while `Combine.PerKey` doesn't have this >> > restriction. >> > >> > Thanks, >> > Manu >> > >> >> >>
Re: Why does `Combine.perKey(SerializableFunction)` require same input and output type
Thanks for the thorough explanation. I see the benefits for such a function. My follow-up question is whether this is a hard requirement. There are computations that don't satisfy this (I think it's monoid rule) but possible and easier to write with Combine.perKey(SerializableFunction). It's not difficult to provide an underlying CombineFn. On Thu, Oct 27, 2016 at 9:47 AM Lukasz Cwik wrote: > Combine.perKey takes a single SerializableFunction which knows how to > convert from Iterable to V. > > It turns out that many runners implement optimizations which allow them to > run the combine operation across several machines to parallelize the work > and potentially reduce the amount of data they store during a GBK. > To be able to do such an optimization, it requires you to actually have > three functions: > InputT -> AccumulatorT : Creates the intermediate representation which > allows for associative combining > Iterable -> AccumulatorT: Performs the actual combining > AccumT -> OutputT: Extracts the output > > In the case of Combine.perKey with a SerializableFunction, your providing > Iterable -> AccumulatorT and the other two functions are the > identity functions. > > To be able to support a Combine.perKey which can go from Iterable > -> OutputT would require that this occurred within a single machine > removing the parallelization benefits that runners provide and for almost > all cases is not a good idea. > > On Wed, Oct 26, 2016 at 6:23 PM, Manu Zhang > wrote: > > > Hi all, > > > > I'm wondering why `Combine.perKey(SerializableFunction)` requires input > > and > > output to be of the same type while `Combine.PerKey` doesn't have this > > restriction. > > > > Thanks, > > Manu > > >