The key in GroupIntoBatches is actually not semantically meaningful, and
for a batch pipeline the use of state/timers is not needed either. If all
you need to do is batch elements into groups of (at most) N, you can write
a DoFn that collects things in its process method and emits them when the
batch is full (and also in the finish bundle method, though some care needs
to be taken to handle windowing correctly). On the other hand, if you're
trying to limit the parallelism across all workers you'd likely need to
limit the number of concurrently-processed keys (which would require a
grouping of some sort onto a finite number of keys, unless you want to cap
your entire pipeline at a certain number of workers).

On Thu, May 25, 2023 at 2:34 PM Evan Galpin <egal...@apache.org> wrote:

> Understood, thanks for the clarification, I'll need to look more in-depth
> at my pipeline code then.  I'm definitely observing that all steps
> downstream from the Stateful step in my pipeline do not start until steps
> upstream of the Stateful step are fully completed.  The Stateful step is a
> RateLimit[1] transfer which borrows heavily from GroupIntoBatches.
>
> [1] https://gist.github.com/egalpin/162a04b896dc7be1d0899acf17e676b3
>
> On Thu, May 25, 2023 at 2:25 PM Robert Bradshaw via user <
> user@beam.apache.org> wrote:
>
>> The GbkBeforeStatefulParDo is an implementation detail used to send all
>> elements with the same key to the same worker (so that they can share
>> state, which is itself partitioned by worker). This does cause a global
>> barrier in batch pipelines.
>>
>> On Thu, May 25, 2023 at 2:15 PM Evan Galpin <egal...@apache.org> wrote:
>>
>>> Hi all,
>>>
>>> I'm running into a scenario where I feel that Dataflow Overrides
>>> (specifically BatchStatefulParDoOverrides.GbkBeforeStatefulParDo ) are
>>> unnecessarily causing a batch pipeline to "pause" throughput since a GBK
>>> needs to have processed all the data in a window before it can output.
>>>
>>> Is it strictly required that GbkBeforeStatefulParDo must run before any
>>> stateful DoFn? If not, what failure modes is GbkBeforeStatefulParDo trying
>>> to protect against, and how can it be bypassed/disabled while still using
>>> DataflowRunner?
>>>
>>> Thanks,
>>> Evan
>>>
>>

Reply via email to