The closest primitve to that intent seems to be CombineValues: https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/core.py#L3010 , and you should be able to write:
max_sample_size = 100_000 ( keyed_nums | GroupByKey() | Map(lambda k_nums: (k, nums[:max_sample_size])) | CombineValues(MeanCombineFn()) ``` Would that work for other scenarios you have in mind? Haven't thought too much about this but from looking at https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/combiners.py#L90, I could see us adding Mean.GroupedValues or Mean.PerGroupedValues there. On Fri, Sep 27, 2024 at 10:41 AM Joey Tran <joey.t...@schrodinger.com> wrote: > It feels more natural because it's only using GroupByKey once instead of > once per combiner. Which I think is still more efficient even accounting > for combiner lifting (unless there's some kind of pipeline optimization > that merges multiple groupbykey's on the same pcollection into a single > GBK). > > You can imagine a different use case where this pattern might arise that > isn't just trying to reduce GBKs though. For example: > > ``` > max_sample_size = 100_000 > ( keyed_nums > | GroupByKey() > | Map(lambda k_nums: (k, nums[:max_sample_size])) > | #?? Mean.PerGrouped()? > ``` > > To take the mean of every grouped_values using current combiners, I think > you'd have to use an inverted groupbykey and then call `Mean.PerKey()` > unless I'm missing something. > > (I recognize that writing a Map that takes a mean is simple enough, but in > a real use case we might have a more complicated combiner) > > On Fri, Sep 27, 2024 at 1:31 PM Valentyn Tymofieiev via user < > user@beam.apache.org> wrote: > >> >> >> On Fri, Sep 27, 2024 at 8:35 AM Joey Tran <joey.t...@schrodinger.com> >> wrote: >> >>> Hey all, >>> >>> Just curious if this pattern comes up for others and if people have >>> worked out a good convention. >>> >>> There are many combiners and a lot of them have two forms: a global form >>> (e.g. Count.Globally) and a per key form (e.g. Count.PerKey). These are >>> convenient but it feels like often we're running into the case where we >>> GroupBy a set of data once and then wish to perform a series of combines on >>> them, in which case neither of these forms work, and it begs another form >>> which operates on pre-grouped KVs. >>> >>> Contrived example: maybe you have a pcollection of keyed numbers and you >>> want to calculate some summary statistics on them. You could do: >>> ``` >>> keyed_means = (keyed_nums >>> | Mean.PerKey()) >>> keyed_counts = (keyed_num >>> | Count.PerKey()) >>> ... # other combines >>> ``` >>> But it'd feel more natural to pre-group the pcollection. >>> >> >> Does it feel more natural because it feels as though it would be more >> performant? Because it seems like it adds an extra grouping step to the >> pipeline code, which otherwise might be not necessary. Note that Dataflow >> has the "combiner lifting" optimization, and combiner-specified-reduction >> happens before the data is written into shuffle as much as possible: >> https://cloud.google.com/dataflow/docs/pipeline-lifecycle#combine_optimization >> . >> >> >>> ``` >>> grouped_nums = keyed_nums | GBK() >>> keyed_means = (grouped_nums | Mean.PerGrouped()) >>> keyed_counts (grouped_nums | Count.PerGrouped()) >>> ``` >>> But these "PerGrouped" variants don't actually currently exist. Does >>> anyone else run into this pattern often? I might be missing an obvious >>> pattern here. >>> >>> -- >>> >>> Joey Tran | Staff Developer | AutoDesigner TL >>> >>> *he/him* >>> >>> [image: Schrödinger, Inc.] <https://schrodinger.com/> >>> >>