The closest primitve to that intent seems to be CombineValues:
https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/core.py#L3010
, and you should be able to write:

max_sample_size = 100_000
( keyed_nums
 | GroupByKey()
 | Map(lambda k_nums: (k, nums[:max_sample_size]))
 | CombineValues(MeanCombineFn())
```
Would that work for other scenarios you have in mind?

Haven't thought too much about this but from looking at
https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/combiners.py#L90,
I could see us adding Mean.GroupedValues or Mean.PerGroupedValues there.


On Fri, Sep 27, 2024 at 10:41 AM Joey Tran <joey.t...@schrodinger.com>
wrote:

> It feels more natural because it's only using GroupByKey once instead of
> once per combiner. Which I think is still more efficient even accounting
> for combiner lifting (unless there's some kind of pipeline optimization
> that merges multiple groupbykey's on the same pcollection into a single
> GBK).
>
> You can imagine a different use case where this pattern might arise that
> isn't just trying to reduce GBKs though. For example:
>
> ```
> max_sample_size = 100_000
> ( keyed_nums
>  | GroupByKey()
>  | Map(lambda k_nums: (k, nums[:max_sample_size]))
>  | #??  Mean.PerGrouped()?
> ```
>
> To take the mean of every grouped_values using current combiners, I think
> you'd have to use an inverted groupbykey and then call `Mean.PerKey()`
> unless I'm missing something.
>
> (I recognize that writing a Map that takes a mean is simple enough, but in
> a real use case we might have a more complicated combiner)
>
> On Fri, Sep 27, 2024 at 1:31 PM Valentyn Tymofieiev via user <
> user@beam.apache.org> wrote:
>
>>
>>
>> On Fri, Sep 27, 2024 at 8:35 AM Joey Tran <joey.t...@schrodinger.com>
>> wrote:
>>
>>> Hey all,
>>>
>>> Just curious if this pattern comes up for others and if people have
>>> worked out a good convention.
>>>
>>> There are many combiners and a lot of them have two forms: a global form
>>> (e.g. Count.Globally) and a per key form (e.g. Count.PerKey). These are
>>> convenient but it feels like often we're running into the case where we
>>> GroupBy a set of data once and then wish to perform a series of combines on
>>> them, in which case neither of these forms work, and it begs another form
>>> which operates on pre-grouped KVs.
>>>
>>> Contrived example: maybe you have a pcollection of keyed numbers and you
>>> want to calculate some summary statistics on them. You could do:
>>> ```
>>> keyed_means = (keyed_nums
>>>  | Mean.PerKey())
>>> keyed_counts = (keyed_num
>>>  | Count.PerKey())
>>> ... # other combines
>>> ```
>>> But it'd feel more natural to pre-group the pcollection.
>>>
>>
>> Does it feel more natural because it feels as though it would be more
>> performant? Because it seems like it adds an extra grouping step to the
>> pipeline code, which otherwise might be not necessary. Note that Dataflow
>> has the "combiner lifting" optimization, and combiner-specified-reduction
>> happens before the data is written into shuffle as much as possible:
>> https://cloud.google.com/dataflow/docs/pipeline-lifecycle#combine_optimization
>> .
>>
>>
>>> ```
>>> grouped_nums = keyed_nums | GBK()
>>> keyed_means = (grouped_nums | Mean.PerGrouped())
>>> keyed_counts (grouped_nums | Count.PerGrouped())
>>> ```
>>> But these "PerGrouped" variants don't actually currently exist. Does
>>> anyone else run into this pattern often? I might be missing an obvious
>>> pattern here.
>>>
>>> --
>>>
>>> Joey Tran | Staff Developer | AutoDesigner TL
>>>
>>> *he/him*
>>>
>>> [image: Schrödinger, Inc.] <https://schrodinger.com/>
>>>
>>

Reply via email to