On Fri, Sep 27, 2024 at 11:13 AM Joey Tran <joey.t...@schrodinger.com> wrote:
> Ah! That is exactly the kind of primitive I was looking for but thought > didn't exist. Thanks for pointing it out. Yeah that works well for me, I'll > use that in my combiners (with an API of `PerGroupedValues`). Thanks! > > If we did want to add `PerGroupedValues` to our current combiners I'd also > be happy to put up a PR doing that > I don't see why not. I'd run by dev@ for naming ideas. PerGroup is another possibility. > > On Fri, Sep 27, 2024 at 2:01 PM Valentyn Tymofieiev <valen...@google.com> > wrote: > >> The closest primitve to that intent seems to be CombineValues: >> https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/core.py#L3010 >> , and you should be able to write: >> >> max_sample_size = 100_000 >> ( keyed_nums >> | GroupByKey() >> | Map(lambda k_nums: (k, nums[:max_sample_size])) >> | CombineValues(MeanCombineFn()) >> ``` >> Would that work for other scenarios you have in mind? >> >> Haven't thought too much about this but from looking at >> https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/combiners.py#L90, >> I could see us adding Mean.GroupedValues or Mean.PerGroupedValues there. >> >> >> On Fri, Sep 27, 2024 at 10:41 AM Joey Tran <joey.t...@schrodinger.com> >> wrote: >> >>> It feels more natural because it's only using GroupByKey once instead of >>> once per combiner. Which I think is still more efficient even accounting >>> for combiner lifting (unless there's some kind of pipeline optimization >>> that merges multiple groupbykey's on the same pcollection into a single >>> GBK). >>> >>> You can imagine a different use case where this pattern might arise that >>> isn't just trying to reduce GBKs though. For example: >>> >>> ``` >>> max_sample_size = 100_000 >>> ( keyed_nums >>> | GroupByKey() >>> | Map(lambda k_nums: (k, nums[:max_sample_size])) >>> | #?? Mean.PerGrouped()? >>> ``` >>> >>> To take the mean of every grouped_values using current combiners, I >>> think you'd have to use an inverted groupbykey and then call >>> `Mean.PerKey()` unless I'm missing something. >>> >>> (I recognize that writing a Map that takes a mean is simple enough, but >>> in a real use case we might have a more complicated combiner) >>> >>> On Fri, Sep 27, 2024 at 1:31 PM Valentyn Tymofieiev via user < >>> user@beam.apache.org> wrote: >>> >>>> >>>> >>>> On Fri, Sep 27, 2024 at 8:35 AM Joey Tran <joey.t...@schrodinger.com> >>>> wrote: >>>> >>>>> Hey all, >>>>> >>>>> Just curious if this pattern comes up for others and if people have >>>>> worked out a good convention. >>>>> >>>>> There are many combiners and a lot of them have two forms: a global >>>>> form (e.g. Count.Globally) and a per key form (e.g. Count.PerKey). These >>>>> are convenient but it feels like often we're running into the case where >>>>> we >>>>> GroupBy a set of data once and then wish to perform a series of combines >>>>> on >>>>> them, in which case neither of these forms work, and it begs another form >>>>> which operates on pre-grouped KVs. >>>>> >>>>> Contrived example: maybe you have a pcollection of keyed numbers and >>>>> you want to calculate some summary statistics on them. You could do: >>>>> ``` >>>>> keyed_means = (keyed_nums >>>>> | Mean.PerKey()) >>>>> keyed_counts = (keyed_num >>>>> | Count.PerKey()) >>>>> ... # other combines >>>>> ``` >>>>> But it'd feel more natural to pre-group the pcollection. >>>>> >>>> >>>> Does it feel more natural because it feels as though it would be more >>>> performant? Because it seems like it adds an extra grouping step to the >>>> pipeline code, which otherwise might be not necessary. Note that Dataflow >>>> has the "combiner lifting" optimization, and combiner-specified-reduction >>>> happens before the data is written into shuffle as much as possible: >>>> https://cloud.google.com/dataflow/docs/pipeline-lifecycle#combine_optimization >>>> . >>>> >>>> >>>>> ``` >>>>> grouped_nums = keyed_nums | GBK() >>>>> keyed_means = (grouped_nums | Mean.PerGrouped()) >>>>> keyed_counts (grouped_nums | Count.PerGrouped()) >>>>> ``` >>>>> But these "PerGrouped" variants don't actually currently exist. Does >>>>> anyone else run into this pattern often? I might be missing an obvious >>>>> pattern here. >>>>> >>>>> -- >>>>> >>>>> Joey Tran | Staff Developer | AutoDesigner TL >>>>> >>>>> *he/him* >>>>> >>>>> [image: Schrödinger, Inc.] <https://schrodinger.com/> >>>>> >>>>