Re: Runner Bundling Strategies

2023-09-27 Thread Kenneth Knowles
On Wed, Sep 27, 2023 at 2:53 PM Robert Bradshaw via dev 
wrote:

> On Wed, Sep 27, 2023 at 10:58 AM Reuven Lax via dev 
> wrote:
>
>> DoFns are allowed to be non deterministic, so they don't have to yield
>> the "same" output.
>>
>
> Yeah. I'm more thinking here that there's a set of outputs that are
> considered equivalently valid.
>

exactly, "the same as much as the user expects it to be the same" :-)


>
>
>> The example I'm thinking of is where users perform some "best-effort"
>> deduplication by creating a hashmap in StartBundle and removing duplicates.
>> This is usually done purely for performance to reduce shuffle size, as
>> opposed to a guaranteed RemoveDuplicates. This scenario doesn't require
>> FinishBundle, though it does require a StartBundle.
>>
>
> This is a good example--the presence of Start *or* Finish is enough to
> indicate that the bundle outputs cannot be committed totally independently.
>

+1 to this example - it is a good example to demonstrate that @StartBundle
for communication with the runner is not redundant with lazy initialization.

Kenn


>
> On the other hand, if there's a Start but no Finish we could safely
> truncate (and retry) the outputs at any point and still get a
> valid-under-the-model result, which could play well with the checkpointing
> model of persistence. This could possibly allow for optimizations purely
> from static analysis of the DoFn.
>
>
>> On Tue, Sep 26, 2023 at 11:59 AM Kenneth Knowles  wrote:
>>
>>>
>>>
>>> On Tue, Sep 26, 2023 at 1:33 PM Reuven Lax via dev 
>>> wrote:
>>>
 Yes, not including FinishBundle in ParDoPayload seems like a mistake.
 Though absence of FinishBundle doesn't mean that one can assume that
 elements in a bundle don't affect subsequent bundle elements (i.e. there
 might still be caching!)

>>>
>>> Well for a DoFn to be correct, it has to yield the same (or "the same as
>>> much as the user expects it to be the same") output regardless of order of
>>> processing or bundling so a runner or SDK harness can definitely take a
>>> bunch of elements and process them however it wants if there's
>>> no @FinishBundle. I think that's what Jan is getting at - adding
>>> a @FinishBundle is the user placing a new restriction on the runner.
>>> Technically probably have to include @StartBundle in that consideration.
>>>
>>> Kenn
>>>
>>>

 On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles 
 wrote:

>
>
> On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský  wrote:
>
>> Hi Kenn and Reuven,
>>
>> I agree with all these points. The only issue here seems to be that
>> FlinkRunner does not fulfill these constraints. This is a bug that can be
>> fixed, though we need to change some defaults, as 1000 ms default bundle
>> "duration" for lower traffic Pipelines can be too much. We are also
>> probably missing some @ValidatesReunner tests for this. I created [1] and
>> [2] to track this.
>>
>> One question still remains, the bundle vs. element life-cycle is
>> relevant only for cases where processing of element X can affect 
>> processing
>> of element Y later in the same bundle. Once this influence is rules out
>> (i.e. no caching), this information can result in runner optimization 
>> that
>> yields better performance. Should we consider propagate this information
>> from user code to the runner?
>>
> Yes!
>
> This was the explicit goal of the move to annotation-driven DoFn in
> https://s.apache.org/a-new-dofn to make it so that the SDK and runner
> can get good information about what the DoFn requirements are.
>
> When there is no @FinishBundle method, the runner can make additional
> optimizations. This should have been included in the ParDoPayload in the
> proto when we moved to portable pipelines. I cannot remember if there was 
> a
> good reason that we did not do so. Maybe we (incorrectly) thought that 
> this
> was an issue that only the Java SDK harness needed to know about.
>
> Kenn
>
>
>> [1] https://github.com/apache/beam/issues/28649
>>
>> [2] https://github.com/apache/beam/issues/28650
>> On 9/25/23 18:31, Reuven Lax via dev wrote:
>>
>>
>>
>> On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský  wrote:
>>
>>>
>>> On 9/23/23 18:16, Reuven Lax via dev wrote:
>>>
>>> Two separate things here:
>>>
>>> 1. Yes, a watermark can update in the middle of a bundle.
>>> 2. The records in the bundle themselves will prevent the watermark
>>> from updating as they are still in flight until after finish bundle.
>>> Therefore simply caching the records should always be watermark safe,
>>> regardless of the runner. You will only run into problems if you try and
>>> move timestamps "backwards" - which is why Beam strongly discourages 
>>> this.
>>>
>>> This is not aligned with  FlinkRunner's 

Re: Runner Bundling Strategies

2023-09-27 Thread Robert Bradshaw via dev
On Wed, Sep 27, 2023 at 10:58 AM Reuven Lax via dev 
wrote:

> DoFns are allowed to be non deterministic, so they don't have to yield the
> "same" output.
>

Yeah. I'm more thinking here that there's a set of outputs that are
considered equivalently valid.


> The example I'm thinking of is where users perform some "best-effort"
> deduplication by creating a hashmap in StartBundle and removing duplicates.
> This is usually done purely for performance to reduce shuffle size, as
> opposed to a guaranteed RemoveDuplicates. This scenario doesn't require
> FinishBundle, though it does require a StartBundle.
>

This is a good example--the presence of Start *or* Finish is enough to
indicate that the bundle outputs cannot be committed totally independently.

On the other hand, if there's a Start but no Finish we could safely
truncate (and retry) the outputs at any point and still get a
valid-under-the-model result, which could play well with the checkpointing
model of persistence. This could possibly allow for optimizations purely
from static analysis of the DoFn.


> On Tue, Sep 26, 2023 at 11:59 AM Kenneth Knowles  wrote:
>
>>
>>
>> On Tue, Sep 26, 2023 at 1:33 PM Reuven Lax via dev 
>> wrote:
>>
>>> Yes, not including FinishBundle in ParDoPayload seems like a mistake.
>>> Though absence of FinishBundle doesn't mean that one can assume that
>>> elements in a bundle don't affect subsequent bundle elements (i.e. there
>>> might still be caching!)
>>>
>>
>> Well for a DoFn to be correct, it has to yield the same (or "the same as
>> much as the user expects it to be the same") output regardless of order of
>> processing or bundling so a runner or SDK harness can definitely take a
>> bunch of elements and process them however it wants if there's
>> no @FinishBundle. I think that's what Jan is getting at - adding
>> a @FinishBundle is the user placing a new restriction on the runner.
>> Technically probably have to include @StartBundle in that consideration.
>>
>> Kenn
>>
>>
>>>
>>> On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles  wrote:
>>>


 On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský  wrote:

> Hi Kenn and Reuven,
>
> I agree with all these points. The only issue here seems to be that
> FlinkRunner does not fulfill these constraints. This is a bug that can be
> fixed, though we need to change some defaults, as 1000 ms default bundle
> "duration" for lower traffic Pipelines can be too much. We are also
> probably missing some @ValidatesReunner tests for this. I created [1] and
> [2] to track this.
>
> One question still remains, the bundle vs. element life-cycle is
> relevant only for cases where processing of element X can affect 
> processing
> of element Y later in the same bundle. Once this influence is rules out
> (i.e. no caching), this information can result in runner optimization that
> yields better performance. Should we consider propagate this information
> from user code to the runner?
>
 Yes!

 This was the explicit goal of the move to annotation-driven DoFn in
 https://s.apache.org/a-new-dofn to make it so that the SDK and runner
 can get good information about what the DoFn requirements are.

 When there is no @FinishBundle method, the runner can make additional
 optimizations. This should have been included in the ParDoPayload in the
 proto when we moved to portable pipelines. I cannot remember if there was a
 good reason that we did not do so. Maybe we (incorrectly) thought that this
 was an issue that only the Java SDK harness needed to know about.

 Kenn


> [1] https://github.com/apache/beam/issues/28649
>
> [2] https://github.com/apache/beam/issues/28650
> On 9/25/23 18:31, Reuven Lax via dev wrote:
>
>
>
> On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský  wrote:
>
>>
>> On 9/23/23 18:16, Reuven Lax via dev wrote:
>>
>> Two separate things here:
>>
>> 1. Yes, a watermark can update in the middle of a bundle.
>> 2. The records in the bundle themselves will prevent the watermark
>> from updating as they are still in flight until after finish bundle.
>> Therefore simply caching the records should always be watermark safe,
>> regardless of the runner. You will only run into problems if you try and
>> move timestamps "backwards" - which is why Beam strongly discourages 
>> this.
>>
>> This is not aligned with  FlinkRunner's implementation. And I
>> actually think it is not aligned conceptually.  As mentioned, Flink does
>> not have the concept of bundles at all. It achieves fault tolerance via
>> checkpointing, essentially checkpoint barrier flowing from sources to
>> sinks, safely snapshotting state of each operator on the way. Bundles are
>> implemented as a somewhat arbitrary set of elements between two 
>> consecutive
>> checkpoints 

Re: Runner Bundling Strategies

2023-09-27 Thread Jan Lukavský
Understood, thanks. This is fairly unintuitive from the "checkpoint 
barrier" viewpoint, because when such runner fails, it simply restarts 
from the checkpoint as it would be a fresh start - i.e. calling Setup. 
It makes sense that a bundle-based runner might not do that.


It seems to follow that we cannot infer any optimizations purely from 
static analysis of the DoFn, should we consider adding an opt-out 
parameter for the bundle atomicity (which has also implications) and 
bundle in-flight element watermark hold? I'd say yes, because otherwise 
we might restrict some runners too much.


On 9/27/23 20:24, Reuven Lax via dev wrote:
Using Setup would cause data loss in this case. A runner can always 
retry a bundle, and I don't believe Setup is called again in this 
case. If the user initiated the hashmap in setup, this would cause 
records to be completely lost whenever bundles retry.


On Wed, Sep 27, 2023 at 11:20 AM Jan Lukavský  wrote:

What is the reason to rely on StartBundle and not Setup in this
case? If the life-cycle of bundle is not "closed" (i.e. start -
finish), then it seems to be ill defined and Setup should do?
I'm trying to think of non-caching use-cases of
StartBundle-FinishBundle, are there such cases? I'd say yes, but
I'm a little struggling finding a specific example that cannot be
solved using Setup or lazy init.

On 9/27/23 19:58, Reuven Lax via dev wrote:

DoFns are allowed to be non deterministic, so they don't have to
yield the "same" output.

The example I'm thinking of is where users perform some
"best-effort" deduplication by creating a hashmap in StartBundle
and removing duplicates. This is usually done purely for
performance to reduce shuffle size, as opposed to a guaranteed
RemoveDuplicates. This scenario doesn't require FinishBundle,
though it does require a StartBundle.

On Tue, Sep 26, 2023 at 11:59 AM Kenneth Knowles
 wrote:



On Tue, Sep 26, 2023 at 1:33 PM Reuven Lax via dev
 wrote:

Yes, not including FinishBundle in ParDoPayload seems
like a mistake. Though absence of FinishBundle doesn't
mean that one can assume that elements in a bundle don't
affect subsequent bundle elements (i.e. there might still
be caching!)


Well for a DoFn to be correct, it has to yield the same (or
"the same as much as the user expects it to be the same")
output regardless of order of processing or bundling so a
runner or SDK harness can definitely take a bunch of elements
and process them however it wants if there's
no @FinishBundle. I think that's what Jan is getting at -
adding a @FinishBundle is the user placing a new restriction
on the runner. Technically probably have to
include @StartBundle in that consideration.

Kenn


On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles
 wrote:



On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský
 wrote:

Hi Kenn and Reuven,

I agree with all these points. The only issue
here seems to be that FlinkRunner does not
fulfill these constraints. This is a bug that can
be fixed, though we need to change some defaults,
as 1000 ms default bundle "duration" for lower
traffic Pipelines can be too much. We are also
probably missing some @ValidatesReunner tests for
this. I created [1] and [2] to track this.

One question still remains, the bundle vs.
element life-cycle is relevant only for cases
where processing of element X can affect
processing of element Y later in the same bundle.
Once this influence is rules out (i.e. no
caching), this information can result in runner
optimization that yields better performance.
Should we consider propagate this information
from user code to the runner?

Yes!

This was the explicit goal of the move to
annotation-driven DoFn in
https://s.apache.org/a-new-dofn to make it so that
the SDK and runner can get good information about
what the DoFn requirements are.

When there is no @FinishBundle method, the runner can
make additional optimizations. This should have been
included in the ParDoPayload in the proto when we
moved to portable pipelines. I cannot remember if
there was a good reason that we did not do so. Maybe
we (incorrectly) thought that this was an issue that
only the Java SDK harness 

Re: Runner Bundling Strategies

2023-09-27 Thread Reuven Lax via dev
Using Setup would cause data loss in this case. A runner can always retry a
bundle, and I don't believe Setup is called again in this case. If the user
initiated the hashmap in setup, this would cause records to be completely
lost whenever bundles retry.

On Wed, Sep 27, 2023 at 11:20 AM Jan Lukavský  wrote:

> What is the reason to rely on StartBundle and not Setup in this case? If
> the life-cycle of bundle is not "closed" (i.e. start - finish), then it
> seems to be ill defined and Setup should do?
> I'm trying to think of non-caching use-cases of StartBundle-FinishBundle,
> are there such cases? I'd say yes, but I'm a little struggling finding a
> specific example that cannot be solved using Setup or lazy init.
> On 9/27/23 19:58, Reuven Lax via dev wrote:
>
> DoFns are allowed to be non deterministic, so they don't have to yield the
> "same" output.
>
> The example I'm thinking of is where users perform some "best-effort"
> deduplication by creating a hashmap in StartBundle and removing duplicates.
> This is usually done purely for performance to reduce shuffle size, as
> opposed to a guaranteed RemoveDuplicates. This scenario doesn't require
> FinishBundle, though it does require a StartBundle.
>
> On Tue, Sep 26, 2023 at 11:59 AM Kenneth Knowles  wrote:
>
>>
>>
>> On Tue, Sep 26, 2023 at 1:33 PM Reuven Lax via dev 
>> wrote:
>>
>>> Yes, not including FinishBundle in ParDoPayload seems like a mistake.
>>> Though absence of FinishBundle doesn't mean that one can assume that
>>> elements in a bundle don't affect subsequent bundle elements (i.e. there
>>> might still be caching!)
>>>
>>
>> Well for a DoFn to be correct, it has to yield the same (or "the same as
>> much as the user expects it to be the same") output regardless of order of
>> processing or bundling so a runner or SDK harness can definitely take a
>> bunch of elements and process them however it wants if there's
>> no @FinishBundle. I think that's what Jan is getting at - adding
>> a @FinishBundle is the user placing a new restriction on the runner.
>> Technically probably have to include @StartBundle in that consideration.
>>
>> Kenn
>>
>>
>>>
>>> On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles  wrote:
>>>


 On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský  wrote:

> Hi Kenn and Reuven,
>
> I agree with all these points. The only issue here seems to be that
> FlinkRunner does not fulfill these constraints. This is a bug that can be
> fixed, though we need to change some defaults, as 1000 ms default bundle
> "duration" for lower traffic Pipelines can be too much. We are also
> probably missing some @ValidatesReunner tests for this. I created [1] and
> [2] to track this.
>
> One question still remains, the bundle vs. element life-cycle is
> relevant only for cases where processing of element X can affect 
> processing
> of element Y later in the same bundle. Once this influence is rules out
> (i.e. no caching), this information can result in runner optimization that
> yields better performance. Should we consider propagate this information
> from user code to the runner?
>
 Yes!

 This was the explicit goal of the move to annotation-driven DoFn in
 https://s.apache.org/a-new-dofn to make it so that the SDK and runner
 can get good information about what the DoFn requirements are.

 When there is no @FinishBundle method, the runner can make additional
 optimizations. This should have been included in the ParDoPayload in the
 proto when we moved to portable pipelines. I cannot remember if there was a
 good reason that we did not do so. Maybe we (incorrectly) thought that this
 was an issue that only the Java SDK harness needed to know about.

 Kenn


> [1] https://github.com/apache/beam/issues/28649
>
> [2] https://github.com/apache/beam/issues/28650
> On 9/25/23 18:31, Reuven Lax via dev wrote:
>
>
>
> On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský  wrote:
>
>>
>> On 9/23/23 18:16, Reuven Lax via dev wrote:
>>
>> Two separate things here:
>>
>> 1. Yes, a watermark can update in the middle of a bundle.
>> 2. The records in the bundle themselves will prevent the watermark
>> from updating as they are still in flight until after finish bundle.
>> Therefore simply caching the records should always be watermark safe,
>> regardless of the runner. You will only run into problems if you try and
>> move timestamps "backwards" - which is why Beam strongly discourages 
>> this.
>>
>> This is not aligned with  FlinkRunner's implementation. And I
>> actually think it is not aligned conceptually.  As mentioned, Flink does
>> not have the concept of bundles at all. It achieves fault tolerance via
>> checkpointing, essentially checkpoint barrier flowing from sources to
>> sinks, safely snapshotting state of each 

Re: Runner Bundling Strategies

2023-09-27 Thread Jan Lukavský
What is the reason to rely on StartBundle and not Setup in this case? If 
the life-cycle of bundle is not "closed" (i.e. start - finish), then it 
seems to be ill defined and Setup should do?
I'm trying to think of non-caching use-cases of 
StartBundle-FinishBundle, are there such cases? I'd say yes, but I'm a 
little struggling finding a specific example that cannot be solved using 
Setup or lazy init.


On 9/27/23 19:58, Reuven Lax via dev wrote:
DoFns are allowed to be non deterministic, so they don't have to yield 
the "same" output.


The example I'm thinking of is where users perform some "best-effort" 
deduplication by creating a hashmap in StartBundle and removing 
duplicates. This is usually done purely for performance to reduce 
shuffle size, as opposed to a guaranteed RemoveDuplicates. This 
scenario doesn't require FinishBundle, though it does require a 
StartBundle.


On Tue, Sep 26, 2023 at 11:59 AM Kenneth Knowles  wrote:



On Tue, Sep 26, 2023 at 1:33 PM Reuven Lax via dev
 wrote:

Yes, not including FinishBundle in ParDoPayload seems like a
mistake. Though absence of FinishBundle doesn't mean that one
can assume that elements in a bundle don't affect subsequent
bundle elements (i.e. there might still be caching!)


Well for a DoFn to be correct, it has to yield the same (or "the
same as much as the user expects it to be the same") output
regardless of order of processing or bundling so a runner or SDK
harness can definitely take a bunch of elements and process them
however it wants if there's no @FinishBundle. I think that's what
Jan is getting at - adding a @FinishBundle is the user placing a
new restriction on the runner. Technically probably have to
include @StartBundle in that consideration.

Kenn


On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles
 wrote:



On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský
 wrote:

Hi Kenn and Reuven,

I agree with all these points. The only issue here
seems to be that FlinkRunner does not fulfill these
constraints. This is a bug that can be fixed, though
we need to change some defaults, as 1000 ms default
bundle "duration" for lower traffic Pipelines can be
too much. We are also probably missing some
@ValidatesReunner tests for this. I created [1] and
[2] to track this.

One question still remains, the bundle vs. element
life-cycle is relevant only for cases where processing
of element X can affect processing of element Y later
in the same bundle. Once this influence is rules out
(i.e. no caching), this information can result in
runner optimization that yields better performance.
Should we consider propagate this information from
user code to the runner?

Yes!

This was the explicit goal of the move to
annotation-driven DoFn in https://s.apache.org/a-new-dofn
to make it so that the SDK and runner can get good
information about what the DoFn requirements are.

When there is no @FinishBundle method, the runner can make
additional optimizations. This should have been included
in the ParDoPayload in the proto when we moved to portable
pipelines. I cannot remember if there was a good reason
that we did not do so. Maybe we (incorrectly) thought that
this was an issue that only the Java SDK harness needed to
know about.

Kenn

[1] https://github.com/apache/beam/issues/28649

[2] https://github.com/apache/beam/issues/28650

On 9/25/23 18:31, Reuven Lax via dev wrote:



On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský
 wrote:


On 9/23/23 18:16, Reuven Lax via dev wrote:

Two separate things here:

1. Yes, a watermark can update in the middle of
a bundle.
2. The records in the bundle themselves will
prevent the watermark from updating as they are
still in flight until after finish bundle.
Therefore simply caching the records should
always be watermark safe, regardless of the
runner. You will only run into problems if you
try and move timestamps "backwards" - which is
why Beam strongly discourages this.

This is not aligned with FlinkRunner's
implementation. And I actually think it is not
aligned conceptually.  As mentioned, Flink does
not 

Re: Runner Bundling Strategies

2023-09-27 Thread Reuven Lax via dev
DoFns are allowed to be non deterministic, so they don't have to yield the
"same" output.

The example I'm thinking of is where users perform some "best-effort"
deduplication by creating a hashmap in StartBundle and removing duplicates.
This is usually done purely for performance to reduce shuffle size, as
opposed to a guaranteed RemoveDuplicates. This scenario doesn't require
FinishBundle, though it does require a StartBundle.

On Tue, Sep 26, 2023 at 11:59 AM Kenneth Knowles  wrote:

>
>
> On Tue, Sep 26, 2023 at 1:33 PM Reuven Lax via dev 
> wrote:
>
>> Yes, not including FinishBundle in ParDoPayload seems like a mistake.
>> Though absence of FinishBundle doesn't mean that one can assume that
>> elements in a bundle don't affect subsequent bundle elements (i.e. there
>> might still be caching!)
>>
>
> Well for a DoFn to be correct, it has to yield the same (or "the same as
> much as the user expects it to be the same") output regardless of order of
> processing or bundling so a runner or SDK harness can definitely take a
> bunch of elements and process them however it wants if there's
> no @FinishBundle. I think that's what Jan is getting at - adding
> a @FinishBundle is the user placing a new restriction on the runner.
> Technically probably have to include @StartBundle in that consideration.
>
> Kenn
>
>
>>
>> On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles  wrote:
>>
>>>
>>>
>>> On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský  wrote:
>>>
 Hi Kenn and Reuven,

 I agree with all these points. The only issue here seems to be that
 FlinkRunner does not fulfill these constraints. This is a bug that can be
 fixed, though we need to change some defaults, as 1000 ms default bundle
 "duration" for lower traffic Pipelines can be too much. We are also
 probably missing some @ValidatesReunner tests for this. I created [1] and
 [2] to track this.

 One question still remains, the bundle vs. element life-cycle is
 relevant only for cases where processing of element X can affect processing
 of element Y later in the same bundle. Once this influence is rules out
 (i.e. no caching), this information can result in runner optimization that
 yields better performance. Should we consider propagate this information
 from user code to the runner?

>>> Yes!
>>>
>>> This was the explicit goal of the move to annotation-driven DoFn in
>>> https://s.apache.org/a-new-dofn to make it so that the SDK and runner
>>> can get good information about what the DoFn requirements are.
>>>
>>> When there is no @FinishBundle method, the runner can make additional
>>> optimizations. This should have been included in the ParDoPayload in the
>>> proto when we moved to portable pipelines. I cannot remember if there was a
>>> good reason that we did not do so. Maybe we (incorrectly) thought that this
>>> was an issue that only the Java SDK harness needed to know about.
>>>
>>> Kenn
>>>
>>>
 [1] https://github.com/apache/beam/issues/28649

 [2] https://github.com/apache/beam/issues/28650
 On 9/25/23 18:31, Reuven Lax via dev wrote:



 On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský  wrote:

>
> On 9/23/23 18:16, Reuven Lax via dev wrote:
>
> Two separate things here:
>
> 1. Yes, a watermark can update in the middle of a bundle.
> 2. The records in the bundle themselves will prevent the watermark
> from updating as they are still in flight until after finish bundle.
> Therefore simply caching the records should always be watermark safe,
> regardless of the runner. You will only run into problems if you try and
> move timestamps "backwards" - which is why Beam strongly discourages this.
>
> This is not aligned with  FlinkRunner's implementation. And I actually
> think it is not aligned conceptually.  As mentioned, Flink does not have
> the concept of bundles at all. It achieves fault tolerance via
> checkpointing, essentially checkpoint barrier flowing from sources to
> sinks, safely snapshotting state of each operator on the way. Bundles are
> implemented as a somewhat arbitrary set of elements between two 
> consecutive
> checkpoints (there can be multiple bundles between checkpoints). A bundle
> is 'committed' (i.e. persistently stored and guaranteed not to retry) only
> after the checkpoint barrier passes over the elements in the bundle (every
> bundle is finished at the very latest exactly before a checkpoint). But
> watermark propagation and bundle finalization is completely unrelated. 
> This
> might be a bug in the runner, but requiring checkpoint for watermark
> propagation will introduce insane delays between processing time and
> watermarks, every executable stage will delay watermark propagation until 
> a
> checkpoint (which is typically the order of seconds). This delay would add
> up after each stage.
>

 

Re: Runner Bundling Strategies

2023-09-26 Thread Robert Bradshaw via dev
On Tue, Sep 26, 2023 at 10:33 AM Reuven Lax via dev 
wrote:

> Yes, not including FinishBundle in ParDoPayload seems like a mistake.
>

No reason we couldn't rectify this now. (We'd need to do it in such a way
that the absence of such an annotation isn't inferred as an incorrect value
of course.)

Though absence of FinishBundle doesn't mean that one can assume that
> elements in a bundle don't affect subsequent bundle elements (i.e. there
> might still be caching!)
>

Without the ability to handle the cached elements in a FinishBundle one
can't safely cache elements. Or, put another way, distributing the calls to
Process among any possible set of DoFn instances is perfectly valid,
including giving them each their own and invoking process() once on each.

On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles  wrote:
>
>>
>>
>> On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský  wrote:
>>
>>> Hi Kenn and Reuven,
>>>
>>> I agree with all these points. The only issue here seems to be that
>>> FlinkRunner does not fulfill these constraints. This is a bug that can be
>>> fixed, though we need to change some defaults, as 1000 ms default bundle
>>> "duration" for lower traffic Pipelines can be too much. We are also
>>> probably missing some @ValidatesReunner tests for this. I created [1] and
>>> [2] to track this.
>>>
>>> One question still remains, the bundle vs. element life-cycle is
>>> relevant only for cases where processing of element X can affect processing
>>> of element Y later in the same bundle. Once this influence is rules out
>>> (i.e. no caching), this information can result in runner optimization that
>>> yields better performance. Should we consider propagate this information
>>> from user code to the runner?
>>>
>> Yes!
>>
>> This was the explicit goal of the move to annotation-driven DoFn in
>> https://s.apache.org/a-new-dofn to make it so that the SDK and runner
>> can get good information about what the DoFn requirements are.
>>
>> When there is no @FinishBundle method, the runner can make additional
>> optimizations. This should have been included in the ParDoPayload in the
>> proto when we moved to portable pipelines. I cannot remember if there was a
>> good reason that we did not do so. Maybe we (incorrectly) thought that this
>> was an issue that only the Java SDK harness needed to know about.
>>
>> Kenn
>>
>>
>>> [1] https://github.com/apache/beam/issues/28649
>>>
>>> [2] https://github.com/apache/beam/issues/28650
>>> On 9/25/23 18:31, Reuven Lax via dev wrote:
>>>
>>>
>>>
>>> On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský  wrote:
>>>

 On 9/23/23 18:16, Reuven Lax via dev wrote:

 Two separate things here:

 1. Yes, a watermark can update in the middle of a bundle.
 2. The records in the bundle themselves will prevent the watermark from
 updating as they are still in flight until after finish bundle. Therefore
 simply caching the records should always be watermark safe, regardless of
 the runner. You will only run into problems if you try and move timestamps
 "backwards" - which is why Beam strongly discourages this.

 This is not aligned with  FlinkRunner's implementation. And I actually
 think it is not aligned conceptually.  As mentioned, Flink does not have
 the concept of bundles at all. It achieves fault tolerance via
 checkpointing, essentially checkpoint barrier flowing from sources to
 sinks, safely snapshotting state of each operator on the way. Bundles are
 implemented as a somewhat arbitrary set of elements between two consecutive
 checkpoints (there can be multiple bundles between checkpoints). A bundle
 is 'committed' (i.e. persistently stored and guaranteed not to retry) only
 after the checkpoint barrier passes over the elements in the bundle (every
 bundle is finished at the very latest exactly before a checkpoint). But
 watermark propagation and bundle finalization is completely unrelated. This
 might be a bug in the runner, but requiring checkpoint for watermark
 propagation will introduce insane delays between processing time and
 watermarks, every executable stage will delay watermark propagation until a
 checkpoint (which is typically the order of seconds). This delay would add
 up after each stage.

>>>
>>> It's not bundles that hold up processing, rather it is elements, and
>>> elements are not considered "processed" until FinishBundle.
>>>
>>> You are right about Flink. In many cases this is fine - if Flink rolls
>>> back to the last checkpoint, the watermark will also roll back, and
>>> everything stays consistent. So in general, one does not need to wait for
>>> checkpoints for watermark propagation.
>>>
>>> Where things get a bit weirder with Flink is whenever one has external
>>> side effects. In theory, one should wait for checkpoints before letting a
>>> Sink flush, otherwise one could end up with incorrect outputs (especially
>>> with a sink like TextIO). 

Re: Runner Bundling Strategies

2023-09-26 Thread Kenneth Knowles
On Tue, Sep 26, 2023 at 1:33 PM Reuven Lax via dev 
wrote:

> Yes, not including FinishBundle in ParDoPayload seems like a mistake.
> Though absence of FinishBundle doesn't mean that one can assume that
> elements in a bundle don't affect subsequent bundle elements (i.e. there
> might still be caching!)
>

Well for a DoFn to be correct, it has to yield the same (or "the same as
much as the user expects it to be the same") output regardless of order of
processing or bundling so a runner or SDK harness can definitely take a
bunch of elements and process them however it wants if there's
no @FinishBundle. I think that's what Jan is getting at - adding
a @FinishBundle is the user placing a new restriction on the runner.
Technically probably have to include @StartBundle in that consideration.

Kenn


>
> On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles  wrote:
>
>>
>>
>> On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský  wrote:
>>
>>> Hi Kenn and Reuven,
>>>
>>> I agree with all these points. The only issue here seems to be that
>>> FlinkRunner does not fulfill these constraints. This is a bug that can be
>>> fixed, though we need to change some defaults, as 1000 ms default bundle
>>> "duration" for lower traffic Pipelines can be too much. We are also
>>> probably missing some @ValidatesReunner tests for this. I created [1] and
>>> [2] to track this.
>>>
>>> One question still remains, the bundle vs. element life-cycle is
>>> relevant only for cases where processing of element X can affect processing
>>> of element Y later in the same bundle. Once this influence is rules out
>>> (i.e. no caching), this information can result in runner optimization that
>>> yields better performance. Should we consider propagate this information
>>> from user code to the runner?
>>>
>> Yes!
>>
>> This was the explicit goal of the move to annotation-driven DoFn in
>> https://s.apache.org/a-new-dofn to make it so that the SDK and runner
>> can get good information about what the DoFn requirements are.
>>
>> When there is no @FinishBundle method, the runner can make additional
>> optimizations. This should have been included in the ParDoPayload in the
>> proto when we moved to portable pipelines. I cannot remember if there was a
>> good reason that we did not do so. Maybe we (incorrectly) thought that this
>> was an issue that only the Java SDK harness needed to know about.
>>
>> Kenn
>>
>>
>>> [1] https://github.com/apache/beam/issues/28649
>>>
>>> [2] https://github.com/apache/beam/issues/28650
>>> On 9/25/23 18:31, Reuven Lax via dev wrote:
>>>
>>>
>>>
>>> On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský  wrote:
>>>

 On 9/23/23 18:16, Reuven Lax via dev wrote:

 Two separate things here:

 1. Yes, a watermark can update in the middle of a bundle.
 2. The records in the bundle themselves will prevent the watermark from
 updating as they are still in flight until after finish bundle. Therefore
 simply caching the records should always be watermark safe, regardless of
 the runner. You will only run into problems if you try and move timestamps
 "backwards" - which is why Beam strongly discourages this.

 This is not aligned with  FlinkRunner's implementation. And I actually
 think it is not aligned conceptually.  As mentioned, Flink does not have
 the concept of bundles at all. It achieves fault tolerance via
 checkpointing, essentially checkpoint barrier flowing from sources to
 sinks, safely snapshotting state of each operator on the way. Bundles are
 implemented as a somewhat arbitrary set of elements between two consecutive
 checkpoints (there can be multiple bundles between checkpoints). A bundle
 is 'committed' (i.e. persistently stored and guaranteed not to retry) only
 after the checkpoint barrier passes over the elements in the bundle (every
 bundle is finished at the very latest exactly before a checkpoint). But
 watermark propagation and bundle finalization is completely unrelated. This
 might be a bug in the runner, but requiring checkpoint for watermark
 propagation will introduce insane delays between processing time and
 watermarks, every executable stage will delay watermark propagation until a
 checkpoint (which is typically the order of seconds). This delay would add
 up after each stage.

>>>
>>> It's not bundles that hold up processing, rather it is elements, and
>>> elements are not considered "processed" until FinishBundle.
>>>
>>> You are right about Flink. In many cases this is fine - if Flink rolls
>>> back to the last checkpoint, the watermark will also roll back, and
>>> everything stays consistent. So in general, one does not need to wait for
>>> checkpoints for watermark propagation.
>>>
>>> Where things get a bit weirder with Flink is whenever one has external
>>> side effects. In theory, one should wait for checkpoints before letting a
>>> Sink flush, otherwise one could end up with incorrect 

Re: Runner Bundling Strategies

2023-09-26 Thread Reuven Lax via dev
Yes, not including FinishBundle in ParDoPayload seems like a mistake.
Though absence of FinishBundle doesn't mean that one can assume that
elements in a bundle don't affect subsequent bundle elements (i.e. there
might still be caching!)

On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles  wrote:

>
>
> On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský  wrote:
>
>> Hi Kenn and Reuven,
>>
>> I agree with all these points. The only issue here seems to be that
>> FlinkRunner does not fulfill these constraints. This is a bug that can be
>> fixed, though we need to change some defaults, as 1000 ms default bundle
>> "duration" for lower traffic Pipelines can be too much. We are also
>> probably missing some @ValidatesReunner tests for this. I created [1] and
>> [2] to track this.
>>
>> One question still remains, the bundle vs. element life-cycle is relevant
>> only for cases where processing of element X can affect processing of
>> element Y later in the same bundle. Once this influence is rules out (i.e.
>> no caching), this information can result in runner optimization that yields
>> better performance. Should we consider propagate this information from user
>> code to the runner?
>>
> Yes!
>
> This was the explicit goal of the move to annotation-driven DoFn in
> https://s.apache.org/a-new-dofn to make it so that the SDK and runner can
> get good information about what the DoFn requirements are.
>
> When there is no @FinishBundle method, the runner can make additional
> optimizations. This should have been included in the ParDoPayload in the
> proto when we moved to portable pipelines. I cannot remember if there was a
> good reason that we did not do so. Maybe we (incorrectly) thought that this
> was an issue that only the Java SDK harness needed to know about.
>
> Kenn
>
>
>> [1] https://github.com/apache/beam/issues/28649
>>
>> [2] https://github.com/apache/beam/issues/28650
>> On 9/25/23 18:31, Reuven Lax via dev wrote:
>>
>>
>>
>> On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský  wrote:
>>
>>>
>>> On 9/23/23 18:16, Reuven Lax via dev wrote:
>>>
>>> Two separate things here:
>>>
>>> 1. Yes, a watermark can update in the middle of a bundle.
>>> 2. The records in the bundle themselves will prevent the watermark from
>>> updating as they are still in flight until after finish bundle. Therefore
>>> simply caching the records should always be watermark safe, regardless of
>>> the runner. You will only run into problems if you try and move timestamps
>>> "backwards" - which is why Beam strongly discourages this.
>>>
>>> This is not aligned with  FlinkRunner's implementation. And I actually
>>> think it is not aligned conceptually.  As mentioned, Flink does not have
>>> the concept of bundles at all. It achieves fault tolerance via
>>> checkpointing, essentially checkpoint barrier flowing from sources to
>>> sinks, safely snapshotting state of each operator on the way. Bundles are
>>> implemented as a somewhat arbitrary set of elements between two consecutive
>>> checkpoints (there can be multiple bundles between checkpoints). A bundle
>>> is 'committed' (i.e. persistently stored and guaranteed not to retry) only
>>> after the checkpoint barrier passes over the elements in the bundle (every
>>> bundle is finished at the very latest exactly before a checkpoint). But
>>> watermark propagation and bundle finalization is completely unrelated. This
>>> might be a bug in the runner, but requiring checkpoint for watermark
>>> propagation will introduce insane delays between processing time and
>>> watermarks, every executable stage will delay watermark propagation until a
>>> checkpoint (which is typically the order of seconds). This delay would add
>>> up after each stage.
>>>
>>
>> It's not bundles that hold up processing, rather it is elements, and
>> elements are not considered "processed" until FinishBundle.
>>
>> You are right about Flink. In many cases this is fine - if Flink rolls
>> back to the last checkpoint, the watermark will also roll back, and
>> everything stays consistent. So in general, one does not need to wait for
>> checkpoints for watermark propagation.
>>
>> Where things get a bit weirder with Flink is whenever one has external
>> side effects. In theory, one should wait for checkpoints before letting a
>> Sink flush, otherwise one could end up with incorrect outputs (especially
>> with a sink like TextIO). Flink itself recognizes this, and that's why they
>> provide TwoPhaseCommitSinkFunction
>> 
>>  which
>> waits for a checkpoint. In Beam, this is the reason we introduced
>> RequiresStableInput. Of course in practice many Flink users don't do this -
>> in which case they are prioritizing latency over data correctness.
>>
>>>
>>> Reuven
>>>
>>> On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský  wrote:
>>>
 > Watermarks shouldn't be (visibly) advanced until 

Re: Runner Bundling Strategies

2023-09-26 Thread Robert Burke
Oh neat, Preserving Keys. I didn't think we had/provided a mechanism for
declaring that.

Good doc.

I do know there's no annotation for FinishBundle. It's generally optional
and SDK side only as a concern.

Finalize Bundle is a different mechanism which does have an annotation, and
requires both SDK and runner support to trigger after a bundle has been
committed/checkpointed.


On Tue, Sep 26, 2023, 8:54 AM Kenneth Knowles  wrote:

>
>
> On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský  wrote:
>
>> Hi Kenn and Reuven,
>>
>> I agree with all these points. The only issue here seems to be that
>> FlinkRunner does not fulfill these constraints. This is a bug that can be
>> fixed, though we need to change some defaults, as 1000 ms default bundle
>> "duration" for lower traffic Pipelines can be too much. We are also
>> probably missing some @ValidatesReunner tests for this. I created [1] and
>> [2] to track this.
>>
>> One question still remains, the bundle vs. element life-cycle is relevant
>> only for cases where processing of element X can affect processing of
>> element Y later in the same bundle. Once this influence is rules out (i.e.
>> no caching), this information can result in runner optimization that yields
>> better performance. Should we consider propagate this information from user
>> code to the runner?
>>
> Yes!
>
> This was the explicit goal of the move to annotation-driven DoFn in
> https://s.apache.org/a-new-dofn to make it so that the SDK and runner can
> get good information about what the DoFn requirements are.
>
> When there is no @FinishBundle method, the runner can make additional
> optimizations. This should have been included in the ParDoPayload in the
> proto when we moved to portable pipelines. I cannot remember if there was a
> good reason that we did not do so. Maybe we (incorrectly) thought that this
> was an issue that only the Java SDK harness needed to know about.
>
> Kenn
>
>
>> [1] https://github.com/apache/beam/issues/28649
>>
>> [2] https://github.com/apache/beam/issues/28650
>> On 9/25/23 18:31, Reuven Lax via dev wrote:
>>
>>
>>
>> On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský  wrote:
>>
>>>
>>> On 9/23/23 18:16, Reuven Lax via dev wrote:
>>>
>>> Two separate things here:
>>>
>>> 1. Yes, a watermark can update in the middle of a bundle.
>>> 2. The records in the bundle themselves will prevent the watermark from
>>> updating as they are still in flight until after finish bundle. Therefore
>>> simply caching the records should always be watermark safe, regardless of
>>> the runner. You will only run into problems if you try and move timestamps
>>> "backwards" - which is why Beam strongly discourages this.
>>>
>>> This is not aligned with  FlinkRunner's implementation. And I actually
>>> think it is not aligned conceptually.  As mentioned, Flink does not have
>>> the concept of bundles at all. It achieves fault tolerance via
>>> checkpointing, essentially checkpoint barrier flowing from sources to
>>> sinks, safely snapshotting state of each operator on the way. Bundles are
>>> implemented as a somewhat arbitrary set of elements between two consecutive
>>> checkpoints (there can be multiple bundles between checkpoints). A bundle
>>> is 'committed' (i.e. persistently stored and guaranteed not to retry) only
>>> after the checkpoint barrier passes over the elements in the bundle (every
>>> bundle is finished at the very latest exactly before a checkpoint). But
>>> watermark propagation and bundle finalization is completely unrelated. This
>>> might be a bug in the runner, but requiring checkpoint for watermark
>>> propagation will introduce insane delays between processing time and
>>> watermarks, every executable stage will delay watermark propagation until a
>>> checkpoint (which is typically the order of seconds). This delay would add
>>> up after each stage.
>>>
>>
>> It's not bundles that hold up processing, rather it is elements, and
>> elements are not considered "processed" until FinishBundle.
>>
>> You are right about Flink. In many cases this is fine - if Flink rolls
>> back to the last checkpoint, the watermark will also roll back, and
>> everything stays consistent. So in general, one does not need to wait for
>> checkpoints for watermark propagation.
>>
>> Where things get a bit weirder with Flink is whenever one has external
>> side effects. In theory, one should wait for checkpoints before letting a
>> Sink flush, otherwise one could end up with incorrect outputs (especially
>> with a sink like TextIO). Flink itself recognizes this, and that's why they
>> provide TwoPhaseCommitSinkFunction
>> 
>>  which
>> waits for a checkpoint. In Beam, this is the reason we introduced
>> RequiresStableInput. Of course in practice many Flink users don't do this -
>> in which case they are prioritizing latency over data correctness.
>>
>>>
>>> 

Re: Runner Bundling Strategies

2023-09-26 Thread Kenneth Knowles
On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský  wrote:

> Hi Kenn and Reuven,
>
> I agree with all these points. The only issue here seems to be that
> FlinkRunner does not fulfill these constraints. This is a bug that can be
> fixed, though we need to change some defaults, as 1000 ms default bundle
> "duration" for lower traffic Pipelines can be too much. We are also
> probably missing some @ValidatesReunner tests for this. I created [1] and
> [2] to track this.
>
> One question still remains, the bundle vs. element life-cycle is relevant
> only for cases where processing of element X can affect processing of
> element Y later in the same bundle. Once this influence is rules out (i.e.
> no caching), this information can result in runner optimization that yields
> better performance. Should we consider propagate this information from user
> code to the runner?
>
Yes!

This was the explicit goal of the move to annotation-driven DoFn in
https://s.apache.org/a-new-dofn to make it so that the SDK and runner can
get good information about what the DoFn requirements are.

When there is no @FinishBundle method, the runner can make additional
optimizations. This should have been included in the ParDoPayload in the
proto when we moved to portable pipelines. I cannot remember if there was a
good reason that we did not do so. Maybe we (incorrectly) thought that this
was an issue that only the Java SDK harness needed to know about.

Kenn


> [1] https://github.com/apache/beam/issues/28649
>
> [2] https://github.com/apache/beam/issues/28650
> On 9/25/23 18:31, Reuven Lax via dev wrote:
>
>
>
> On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský  wrote:
>
>>
>> On 9/23/23 18:16, Reuven Lax via dev wrote:
>>
>> Two separate things here:
>>
>> 1. Yes, a watermark can update in the middle of a bundle.
>> 2. The records in the bundle themselves will prevent the watermark from
>> updating as they are still in flight until after finish bundle. Therefore
>> simply caching the records should always be watermark safe, regardless of
>> the runner. You will only run into problems if you try and move timestamps
>> "backwards" - which is why Beam strongly discourages this.
>>
>> This is not aligned with  FlinkRunner's implementation. And I actually
>> think it is not aligned conceptually.  As mentioned, Flink does not have
>> the concept of bundles at all. It achieves fault tolerance via
>> checkpointing, essentially checkpoint barrier flowing from sources to
>> sinks, safely snapshotting state of each operator on the way. Bundles are
>> implemented as a somewhat arbitrary set of elements between two consecutive
>> checkpoints (there can be multiple bundles between checkpoints). A bundle
>> is 'committed' (i.e. persistently stored and guaranteed not to retry) only
>> after the checkpoint barrier passes over the elements in the bundle (every
>> bundle is finished at the very latest exactly before a checkpoint). But
>> watermark propagation and bundle finalization is completely unrelated. This
>> might be a bug in the runner, but requiring checkpoint for watermark
>> propagation will introduce insane delays between processing time and
>> watermarks, every executable stage will delay watermark propagation until a
>> checkpoint (which is typically the order of seconds). This delay would add
>> up after each stage.
>>
>
> It's not bundles that hold up processing, rather it is elements, and
> elements are not considered "processed" until FinishBundle.
>
> You are right about Flink. In many cases this is fine - if Flink rolls
> back to the last checkpoint, the watermark will also roll back, and
> everything stays consistent. So in general, one does not need to wait for
> checkpoints for watermark propagation.
>
> Where things get a bit weirder with Flink is whenever one has external
> side effects. In theory, one should wait for checkpoints before letting a
> Sink flush, otherwise one could end up with incorrect outputs (especially
> with a sink like TextIO). Flink itself recognizes this, and that's why they
> provide TwoPhaseCommitSinkFunction
> 
>  which
> waits for a checkpoint. In Beam, this is the reason we introduced
> RequiresStableInput. Of course in practice many Flink users don't do this -
> in which case they are prioritizing latency over data correctness.
>
>>
>> Reuven
>>
>> On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský  wrote:
>>
>>> > Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>>> committed, as there's no guarantee that this work won't be discarded.
>>>
>>> There was a thread [1], where the conclusion seemed to be that updating
>>> watermark is possible even in the middle of a bundle. Actually, handling
>>> watermarks is runner-dependent (e.g. Flink does not store watermarks in
>>> checkpoints, they are always recomputed from scratch on restore).
>>>
>>> [1] 

Re: Runner Bundling Strategies

2023-09-25 Thread Jan Lukavský

Hi Kenn and Reuven,

I agree with all these points. The only issue here seems to be that 
FlinkRunner does not fulfill these constraints. This is a bug that can 
be fixed, though we need to change some defaults, as 1000 ms default 
bundle "duration" for lower traffic Pipelines can be too much. We are 
also probably missing some @ValidatesReunner tests for this. I created 
[1] and [2] to track this.


One question still remains, the bundle vs. element life-cycle is 
relevant only for cases where processing of element X can affect 
processing of element Y later in the same bundle. Once this influence is 
rules out (i.e. no caching), this information can result in runner 
optimization that yields better performance. Should we consider 
propagate this information from user code to the runner?


[1] https://github.com/apache/beam/issues/28649

[2] https://github.com/apache/beam/issues/28650

On 9/25/23 18:31, Reuven Lax via dev wrote:



On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský  wrote:


On 9/23/23 18:16, Reuven Lax via dev wrote:

Two separate things here:

1. Yes, a watermark can update in the middle of a bundle.
2. The records in the bundle themselves will prevent the
watermark from updating as they are still in flight until after
finish bundle. Therefore simply caching the records should always
be watermark safe, regardless of the runner. You will only run
into problems if you try and move timestamps "backwards" - which
is why Beam strongly discourages this.

This is not aligned with  FlinkRunner's implementation. And I
actually think it is not aligned conceptually.  As mentioned,
Flink does not have the concept of bundles at all. It achieves
fault tolerance via checkpointing, essentially checkpoint barrier
flowing from sources to sinks, safely snapshotting state of each
operator on the way. Bundles are implemented as a somewhat
arbitrary set of elements between two consecutive checkpoints
(there can be multiple bundles between checkpoints). A bundle is
'committed' (i.e. persistently stored and guaranteed not to retry)
only after the checkpoint barrier passes over the elements in the
bundle (every bundle is finished at the very latest exactly before
a checkpoint). But watermark propagation and bundle finalization
is completely unrelated. This might be a bug in the runner, but
requiring checkpoint for watermark propagation will introduce
insane delays between processing time and watermarks, every
executable stage will delay watermark propagation until a
checkpoint (which is typically the order of seconds). This delay
would add up after each stage.


It's not bundles that hold up processing, rather it is elements, and 
elements are not considered "processed" until FinishBundle.


You are right about Flink. In many cases this is fine - if Flink rolls 
back to the last checkpoint, the watermark will also roll back, and 
everything stays consistent. So in general, one does not need to wait 
for checkpoints for watermark propagation.


Where things get a bit weirder with Flink is whenever one has external 
side effects. In theory, one should wait for checkpoints before 
letting a Sink flush, otherwise one could end up with incorrect 
outputs (especially with a sink like TextIO). Flink itself recognizes 
this, and that's why they provide TwoPhaseCommitSinkFunction 
 which 
waits for a checkpoint. In Beam, this is the reason we introduced 
RequiresStableInput. Of course in practice many Flink users don't do 
this - in which case they are prioritizing latency over data correctness.




Reuven

On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský 
wrote:

> Watermarks shouldn't be (visibly) advanced until
@FinishBundle is committed, as there's no guarantee that this
work won't be discarded.

There was a thread [1], where the conclusion seemed to be
that updating watermark is possible even in the middle of a
bundle. Actually, handling watermarks is runner-dependent
(e.g. Flink does not store watermarks in checkpoints, they
are always recomputed from scratch on restore).

[1]
https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv

On 9/22/23 21:47, Robert Bradshaw via dev wrote:

On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský
 wrote:

On 9/22/23 18:07, Robert Bradshaw via dev wrote:


On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev
 wrote:

I've actually wondered about this specifically for
streaming... if you're writing a pipeline there it
seems like you're often going to want to put high
fixed cost things like database connections even
  

Re: Runner Bundling Strategies

2023-09-25 Thread Reuven Lax via dev
On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský  wrote:

>
> On 9/23/23 18:16, Reuven Lax via dev wrote:
>
> Two separate things here:
>
> 1. Yes, a watermark can update in the middle of a bundle.
> 2. The records in the bundle themselves will prevent the watermark from
> updating as they are still in flight until after finish bundle. Therefore
> simply caching the records should always be watermark safe, regardless of
> the runner. You will only run into problems if you try and move timestamps
> "backwards" - which is why Beam strongly discourages this.
>
> This is not aligned with  FlinkRunner's implementation. And I actually
> think it is not aligned conceptually.  As mentioned, Flink does not have
> the concept of bundles at all. It achieves fault tolerance via
> checkpointing, essentially checkpoint barrier flowing from sources to
> sinks, safely snapshotting state of each operator on the way. Bundles are
> implemented as a somewhat arbitrary set of elements between two consecutive
> checkpoints (there can be multiple bundles between checkpoints). A bundle
> is 'committed' (i.e. persistently stored and guaranteed not to retry) only
> after the checkpoint barrier passes over the elements in the bundle (every
> bundle is finished at the very latest exactly before a checkpoint). But
> watermark propagation and bundle finalization is completely unrelated. This
> might be a bug in the runner, but requiring checkpoint for watermark
> propagation will introduce insane delays between processing time and
> watermarks, every executable stage will delay watermark propagation until a
> checkpoint (which is typically the order of seconds). This delay would add
> up after each stage.
>

It's not bundles that hold up processing, rather it is elements, and
elements are not considered "processed" until FinishBundle.

You are right about Flink. In many cases this is fine - if Flink rolls back
to the last checkpoint, the watermark will also roll back, and everything
stays consistent. So in general, one does not need to wait for checkpoints
for watermark propagation.

Where things get a bit weirder with Flink is whenever one has external side
effects. In theory, one should wait for checkpoints before letting a Sink
flush, otherwise one could end up with incorrect outputs (especially with a
sink like TextIO). Flink itself recognizes this, and that's why they
provide TwoPhaseCommitSinkFunction

which
waits for a checkpoint. In Beam, this is the reason we introduced
RequiresStableInput. Of course in practice many Flink users don't do this -
in which case they are prioritizing latency over data correctness.

>
> Reuven
>
> On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský  wrote:
>
>> > Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>> committed, as there's no guarantee that this work won't be discarded.
>>
>> There was a thread [1], where the conclusion seemed to be that updating
>> watermark is possible even in the middle of a bundle. Actually, handling
>> watermarks is runner-dependent (e.g. Flink does not store watermarks in
>> checkpoints, they are always recomputed from scratch on restore).
>>
>> [1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
>> On 9/22/23 21:47, Robert Bradshaw via dev wrote:
>>
>> On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský  wrote:
>>
>>> On 9/22/23 18:07, Robert Bradshaw via dev wrote:
>>>
>>> On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev 
>>> wrote:
>>>
 I've actually wondered about this specifically for streaming... if
 you're writing a pipeline there it seems like you're often going to want to
 put high fixed cost things like database connections even outside of the
 bundle setup. You really only want to do that once in the lifetime of the
 worker itself, not the bundle. Seems like having that boundary be somewhere
 other than an arbitrarily (and probably small in streaming to avoid
 latency) group of elements might be more useful? I suppose this depends
 heavily on the object lifecycle in the sdk worker though.

>>>
>>> +1. This is the difference between @Setup and @StartBundle. The
>>> start/finish bundle operations should be used for bracketing element
>>> processing that must be committed as a unit for correct failure recovery
>>> (e.g. if elements are cached in ProcessElement, they should all be emitted
>>> in FinishBundle). On the other hand, things like open database connections
>>> can and likely should be shared across bundles.
>>>
>>> This is correct, but the caching between @StartBundle and @FinishBundle
>>> has some problems. First, users need to manually set watermark hold for
>>> min(timestamp in bundle), otherwise watermark might overtake the buffered
>>> elements.
>>>
>>
>> Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>> committed, as there's 

Re: Runner Bundling Strategies

2023-09-25 Thread Kenneth Knowles
These are some good points. Replies inline.

On Mon, Sep 25, 2023 at 9:19 AM Jan Lukavský  wrote:

>
> On 9/23/23 18:16, Reuven Lax via dev wrote:
>
> Two separate things here:
>
> 1. Yes, a watermark can update in the middle of a bundle.
> 2. The records in the bundle themselves will prevent the watermark from
> updating as they are still in flight until after finish bundle. Therefore
> simply caching the records should always be watermark safe, regardless of
> the runner. You will only run into problems if you try and move timestamps
> "backwards" - which is why Beam strongly discourages this.
>
> This is not aligned with  FlinkRunner's implementation. And I actually
> think it is not aligned conceptually.  As mentioned, Flink does not have
> the concept of bundles at all. It achieves fault tolerance via
> checkpointing, essentially checkpoint barrier flowing from sources to
> sinks, safely snapshotting state of each operator on the way.
>


> Bundles are implemented as a somewhat arbitrary set of elements between
> two consecutive checkpoints (there can be multiple bundles between
> checkpoints)
>

Yes, it is a good point. To align the runner: an input element is not
processed until it has been through @ProcessElement and then
also @FinishBundle called. Until that happens, the input element is still
"in process" and would hold the watermark. This doesn't mean the watermark
is frozen; it only means it is constrained.


> A bundle is 'committed' (i.e. persistently stored and guaranteed not to
> retry) only after the checkpoint barrier passes over the elements in the
> bundle (every bundle is finished at the very latest exactly before a
> checkpoint).
>

I think this is fine and does not have to be related to bundle processing
or watermarks. Since Flink does global consistency, any downstream work
that depended on the not-persisted results would also be reset back to the
checkpoint so it is fine.


> But watermark propagation and bundle finalization is completely unrelated.
> This might be a bug in the runner, but requiring checkpoint for watermark
> propagation will introduce insane delays between processing time and
> watermarks, every executable stage will delay watermark propagation until a
> checkpoint (which is typically the order of seconds). This delay would add
> up after each stage.
>

I am aware of this conflict. Interestingly, "requires stable input" is the
case where you must wait until checkpoint finalization, since inputs may
spontaneously change on retry before a checkpoint is finalized. This is not
just a mismatch in Beam/Flink but I believe Flink itself cannot correctly
process this kind of data without waiting for checkpoint finalization.

Kenn

>
> Reuven
>
> On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský  wrote:
>
>> > Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>> committed, as there's no guarantee that this work won't be discarded.
>>
>> There was a thread [1], where the conclusion seemed to be that updating
>> watermark is possible even in the middle of a bundle. Actually, handling
>> watermarks is runner-dependent (e.g. Flink does not store watermarks in
>> checkpoints, they are always recomputed from scratch on restore).
>>
>> [1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
>> On 9/22/23 21:47, Robert Bradshaw via dev wrote:
>>
>> On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský  wrote:
>>
>>> On 9/22/23 18:07, Robert Bradshaw via dev wrote:
>>>
>>> On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev 
>>> wrote:
>>>
 I've actually wondered about this specifically for streaming... if
 you're writing a pipeline there it seems like you're often going to want to
 put high fixed cost things like database connections even outside of the
 bundle setup. You really only want to do that once in the lifetime of the
 worker itself, not the bundle. Seems like having that boundary be somewhere
 other than an arbitrarily (and probably small in streaming to avoid
 latency) group of elements might be more useful? I suppose this depends
 heavily on the object lifecycle in the sdk worker though.

>>>
>>> +1. This is the difference between @Setup and @StartBundle. The
>>> start/finish bundle operations should be used for bracketing element
>>> processing that must be committed as a unit for correct failure recovery
>>> (e.g. if elements are cached in ProcessElement, they should all be emitted
>>> in FinishBundle). On the other hand, things like open database connections
>>> can and likely should be shared across bundles.
>>>
>>> This is correct, but the caching between @StartBundle and @FinishBundle
>>> has some problems. First, users need to manually set watermark hold for
>>> min(timestamp in bundle), otherwise watermark might overtake the buffered
>>> elements.
>>>
>>
>> Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>> committed, as there's no guarantee that this work won't be discarded.
>>
>>
>>> Users 

Re: Runner Bundling Strategies

2023-09-25 Thread Jan Lukavský


On 9/23/23 18:16, Reuven Lax via dev wrote:

Two separate things here:

1. Yes, a watermark can update in the middle of a bundle.
2. The records in the bundle themselves will prevent the watermark 
from updating as they are still in flight until after finish bundle. 
Therefore simply caching the records should always be watermark safe, 
regardless of the runner. You will only run into problems if you try 
and move timestamps "backwards" - which is why Beam strongly 
discourages this.
This is not aligned with  FlinkRunner's implementation. And I actually 
think it is not aligned conceptually.  As mentioned, Flink does not have 
the concept of bundles at all. It achieves fault tolerance via 
checkpointing, essentially checkpoint barrier flowing from sources to 
sinks, safely snapshotting state of each operator on the way. Bundles 
are implemented as a somewhat arbitrary set of elements between two 
consecutive checkpoints (there can be multiple bundles between 
checkpoints). A bundle is 'committed' (i.e. persistently stored and 
guaranteed not to retry) only after the checkpoint barrier passes over 
the elements in the bundle (every bundle is finished at the very latest 
exactly before a checkpoint). But watermark propagation and bundle 
finalization is completely unrelated. This might be a bug in the runner, 
but requiring checkpoint for watermark propagation will introduce insane 
delays between processing time and watermarks, every executable stage 
will delay watermark propagation until a checkpoint (which is typically 
the order of seconds). This delay would add up after each stage.


Reuven

On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský  wrote:

> Watermarks shouldn't be (visibly) advanced until @FinishBundle
is committed, as there's no guarantee that this work won't be
discarded.

There was a thread [1], where the conclusion seemed to be that
updating watermark is possible even in the middle of a bundle.
Actually, handling watermarks is runner-dependent (e.g. Flink does
not store watermarks in checkpoints, they are always recomputed
from scratch on restore).

[1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv

On 9/22/23 21:47, Robert Bradshaw via dev wrote:

On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský 
wrote:

On 9/22/23 18:07, Robert Bradshaw via dev wrote:


On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev
 wrote:

I've actually wondered about this specifically for
streaming... if you're writing a pipeline there it seems
like you're often going to want to put high fixed cost
things like database connections even outside of the
bundle setup. You really only want to do that once in
the lifetime of the worker itself, not the bundle. Seems
like having that boundary be somewhere other than an
arbitrarily (and probably small in streaming to avoid
latency) group of elements might be more useful? I
suppose this depends heavily on the object lifecycle in
the sdk worker though.


+1. This is the difference between @Setup and @StartBundle.
The start/finish bundle operations should be used for
bracketing element processing that must be committed as a
unit for correct failure recovery (e.g. if elements are
cached in ProcessElement, they should all be emitted in
FinishBundle). On the other hand, things like open database
connections can and likely should be shared across bundles.

This is correct, but the caching between @StartBundle and
@FinishBundle has some problems. First, users need to
manually set watermark hold for min(timestamp in bundle),
otherwise watermark might overtake the buffered elements.


Watermarks shouldn't be (visibly) advanced until @FinishBundle is
committed, as there's no guarantee that this work won't be
discarded.

Users don't have other option than using
timer.withOutputTimestamp for that, as we don't have a
user-facing API to set watermark hold otherwise, thus the
in-bundle caching implies stateful DoFn. The question might
then by, why not use "classical" stateful caching involving
state, as there is full control over the caching in user
code. This triggered me an idea if it would be useful to add
the information about caching to the API (e.g. in Java
@StartBundle(caching=true)), which could solve the above
issues maybe (runner would know to set the hold, it could
work with "stateless" DoFns)?


Really, this is one of the areas that the streaming/batch
abstraction leaks. In batch it was a common pattern to have local
DoFn instance state that persisted from start to finish bundle,
and these were also used as convenient entry points for other
operations 

Re: Runner Bundling Strategies

2023-09-23 Thread Reuven Lax via dev
Two separate things here:

1. Yes, a watermark can update in the middle of a bundle.
2. The records in the bundle themselves will prevent the watermark from
updating as they are still in flight until after finish bundle. Therefore
simply caching the records should always be watermark safe, regardless of
the runner. You will only run into problems if you try and move timestamps
"backwards" - which is why Beam strongly discourages this.

Reuven

On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský  wrote:

> > Watermarks shouldn't be (visibly) advanced until @FinishBundle is
> committed, as there's no guarantee that this work won't be discarded.
>
> There was a thread [1], where the conclusion seemed to be that updating
> watermark is possible even in the middle of a bundle. Actually, handling
> watermarks is runner-dependent (e.g. Flink does not store watermarks in
> checkpoints, they are always recomputed from scratch on restore).
>
> [1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
> On 9/22/23 21:47, Robert Bradshaw via dev wrote:
>
> On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský  wrote:
>
>> On 9/22/23 18:07, Robert Bradshaw via dev wrote:
>>
>> On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev 
>> wrote:
>>
>>> I've actually wondered about this specifically for streaming... if
>>> you're writing a pipeline there it seems like you're often going to want to
>>> put high fixed cost things like database connections even outside of the
>>> bundle setup. You really only want to do that once in the lifetime of the
>>> worker itself, not the bundle. Seems like having that boundary be somewhere
>>> other than an arbitrarily (and probably small in streaming to avoid
>>> latency) group of elements might be more useful? I suppose this depends
>>> heavily on the object lifecycle in the sdk worker though.
>>>
>>
>> +1. This is the difference between @Setup and @StartBundle. The
>> start/finish bundle operations should be used for bracketing element
>> processing that must be committed as a unit for correct failure recovery
>> (e.g. if elements are cached in ProcessElement, they should all be emitted
>> in FinishBundle). On the other hand, things like open database connections
>> can and likely should be shared across bundles.
>>
>> This is correct, but the caching between @StartBundle and @FinishBundle
>> has some problems. First, users need to manually set watermark hold for
>> min(timestamp in bundle), otherwise watermark might overtake the buffered
>> elements.
>>
>
> Watermarks shouldn't be (visibly) advanced until @FinishBundle is
> committed, as there's no guarantee that this work won't be discarded.
>
>
>> Users don't have other option than using timer.withOutputTimestamp for
>> that, as we don't have a user-facing API to set watermark hold otherwise,
>> thus the in-bundle caching implies stateful DoFn. The question might then
>> by, why not use "classical" stateful caching involving state, as there is
>> full control over the caching in user code. This triggered me an idea if it
>> would be useful to add the information about caching to the API (e.g. in
>> Java @StartBundle(caching=true)), which could solve the above issues maybe
>> (runner would know to set the hold, it could work with "stateless" DoFns)?
>>
>
> Really, this is one of the areas that the streaming/batch abstraction
> leaks. In batch it was a common pattern to have local DoFn instance state
> that persisted from start to finish bundle, and these were also used as
> convenient entry points for other operations (like opening
> database connections) 'cause bundles were often "as large as possible."
> WIth the advent of n streaming it makes sense to put this in
> explicitly managed runner state to allow for cross-bundle amortization and
> there's more value in distinguishing between @Setup and @StartBundle.
>
> (Were I do to things over I'd probably encourage an API that discouraged
> non-configuration instance state on DoFns altogether, e.g. in the notion of
> Python context managers (and an equivalent API could probably be put
> together with AutoClosables in Java) one would have something like
>
> ParDo(X)
>
> which would logically (though not necessarily physically) lead to an
> execution like
>
> with X.bundle_processor() as bundle_processor:
>   for bundle in bundles:
> with bundle_processor.element_processor() as process:
>   for element in bundle:
> process(element)
>
> where the traditional setup/start_bundle/finish_bundle/teardown logic
> would live in the __enter__ and __exit__ methods (made even easier with
> coroutines.) For convenience one could of course provide a raw bundle
> processor or element processor to ParDo if the enter/exit contexts are
> trivial. But this is getting somewhat off-topic...
>
>
>>
>>>
>>> Best,
>>> B
>>>
>>> On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles  wrote:
>>>
 (I notice that you replied only to yourself, but there has been a whole
 thread of discussion on this 

Re: Runner Bundling Strategies

2023-09-23 Thread Jan Lukavský
> Watermarks shouldn't be (visibly) advanced until @FinishBundle is 
committed, as there's no guarantee that this work won't be discarded.


There was a thread [1], where the conclusion seemed to be that updating 
watermark is possible even in the middle of a bundle. Actually, handling 
watermarks is runner-dependent (e.g. Flink does not store watermarks in 
checkpoints, they are always recomputed from scratch on restore).


[1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv

On 9/22/23 21:47, Robert Bradshaw via dev wrote:

On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský  wrote:

On 9/22/23 18:07, Robert Bradshaw via dev wrote:


On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev
 wrote:

I've actually wondered about this specifically for
streaming... if you're writing a pipeline there it seems like
you're often going to want to put high fixed cost things like
database connections even outside of the bundle setup. You
really only want to do that once in the lifetime of the
worker itself, not the bundle. Seems like having that
boundary be somewhere other than an arbitrarily (and probably
small in streaming to avoid latency) group of elements might
be more useful? I suppose this depends heavily on the object
lifecycle in the sdk worker though.


+1. This is the difference between @Setup and @StartBundle. The
start/finish bundle operations should be used for bracketing
element processing that must be committed as a unit for
correct failure recovery (e.g. if elements are cached in
ProcessElement, they should all be emitted in FinishBundle). On
the other hand, things like open database connections can and
likely should be shared across bundles.

This is correct, but the caching between @StartBundle and
@FinishBundle has some problems. First, users need to manually set
watermark hold for min(timestamp in bundle), otherwise watermark
might overtake the buffered elements.


Watermarks shouldn't be (visibly) advanced until @FinishBundle is 
committed, as there's no guarantee that this work won't be discarded.


Users don't have other option than using timer.withOutputTimestamp
for that, as we don't have a user-facing API to set watermark hold
otherwise, thus the in-bundle caching implies stateful DoFn. The
question might then by, why not use "classical" stateful caching
involving state, as there is full control over the caching in user
code. This triggered me an idea if it would be useful to add the
information about caching to the API (e.g. in Java
@StartBundle(caching=true)), which could solve the above issues
maybe (runner would know to set the hold, it could work with
"stateless" DoFns)?


Really, this is one of the areas that the streaming/batch abstraction 
leaks. In batch it was a common pattern to have local DoFn instance 
state that persisted from start to finish bundle, and these were also 
used as convenient entry points for other operations (like opening 
database connections) 'cause bundles were often "as large as 
possible." WIth the advent of n streaming it makes sense to put this 
in explicitly managed runner state to allow for cross-bundle 
amortization and there's more value in distinguishing between @Setup 
and @StartBundle.


(Were I do to things over I'd probably encourage an API that 
discouraged non-configuration instance state on DoFns altogether, e.g. 
in the notion of Python context managers (and an equivalent API could 
probably be put together with AutoClosables in Java) one would have 
something like


ParDo(X)

which would logically (though not necessarily physically) lead to an 
execution like


with X.bundle_processor() as bundle_processor:
  for bundle in bundles:
    with bundle_processor.element_processor() as process:
      for element in bundle:
        process(element)

where the traditional setup/start_bundle/finish_bundle/teardown logic 
would live in the __enter__ and __exit__ methods (made even easier 
with coroutines.) For convenience one could of course provide a raw 
bundle processor or element processor to ParDo if the enter/exit 
contexts are trivial. But this is getting somewhat off-topic...




Best,
B

On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles
 wrote:

(I notice that you replied only to yourself, but there
has been a whole thread of discussion on this - are you
subscribed to dev@beam?
https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)

It sounds like you want what everyone wants: to have the
biggest bundles possible.

So for bounded data, basically you make even splits of
the data and each split is one bundle. And then dynamic
splitting to redistribute work to eliminate stragglers,
if your engine has that 

Re: Runner Bundling Strategies

2023-09-22 Thread Robert Bradshaw via dev
On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský  wrote:

> On 9/22/23 18:07, Robert Bradshaw via dev wrote:
>
> On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev 
> wrote:
>
>> I've actually wondered about this specifically for streaming... if you're
>> writing a pipeline there it seems like you're often going to want to put
>> high fixed cost things like database connections even outside of the bundle
>> setup. You really only want to do that once in the lifetime of the worker
>> itself, not the bundle. Seems like having that boundary be somewhere other
>> than an arbitrarily (and probably small in streaming to avoid latency)
>> group of elements might be more useful? I suppose this depends heavily on
>> the object lifecycle in the sdk worker though.
>>
>
> +1. This is the difference between @Setup and @StartBundle. The
> start/finish bundle operations should be used for bracketing element
> processing that must be committed as a unit for correct failure recovery
> (e.g. if elements are cached in ProcessElement, they should all be emitted
> in FinishBundle). On the other hand, things like open database connections
> can and likely should be shared across bundles.
>
> This is correct, but the caching between @StartBundle and @FinishBundle
> has some problems. First, users need to manually set watermark hold for
> min(timestamp in bundle), otherwise watermark might overtake the buffered
> elements.
>

Watermarks shouldn't be (visibly) advanced until @FinishBundle is
committed, as there's no guarantee that this work won't be discarded.


> Users don't have other option than using timer.withOutputTimestamp for
> that, as we don't have a user-facing API to set watermark hold otherwise,
> thus the in-bundle caching implies stateful DoFn. The question might then
> by, why not use "classical" stateful caching involving state, as there is
> full control over the caching in user code. This triggered me an idea if it
> would be useful to add the information about caching to the API (e.g. in
> Java @StartBundle(caching=true)), which could solve the above issues maybe
> (runner would know to set the hold, it could work with "stateless" DoFns)?
>

Really, this is one of the areas that the streaming/batch abstraction
leaks. In batch it was a common pattern to have local DoFn instance state
that persisted from start to finish bundle, and these were also used as
convenient entry points for other operations (like opening
database connections) 'cause bundles were often "as large as possible."
WIth the advent of n streaming it makes sense to put this in
explicitly managed runner state to allow for cross-bundle amortization and
there's more value in distinguishing between @Setup and @StartBundle.

(Were I do to things over I'd probably encourage an API that discouraged
non-configuration instance state on DoFns altogether, e.g. in the notion of
Python context managers (and an equivalent API could probably be put
together with AutoClosables in Java) one would have something like

ParDo(X)

which would logically (though not necessarily physically) lead to an
execution like

with X.bundle_processor() as bundle_processor:
  for bundle in bundles:
with bundle_processor.element_processor() as process:
  for element in bundle:
process(element)

where the traditional setup/start_bundle/finish_bundle/teardown logic would
live in the __enter__ and __exit__ methods (made even easier with
coroutines.) For convenience one could of course provide a raw bundle
processor or element processor to ParDo if the enter/exit contexts are
trivial. But this is getting somewhat off-topic...


>
>>
>> Best,
>> B
>>
>> On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles  wrote:
>>
>>> (I notice that you replied only to yourself, but there has been a whole
>>> thread of discussion on this - are you subscribed to dev@beam?
>>> https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)
>>>
>>> It sounds like you want what everyone wants: to have the biggest bundles
>>> possible.
>>>
>>> So for bounded data, basically you make even splits of the data and each
>>> split is one bundle. And then dynamic splitting to redistribute work to
>>> eliminate stragglers, if your engine has that capability.
>>>
>>> For unbounded data, you more-or-less bundle as much as you can without
>>> waiting too long, like Jan described.
>>>
>>> Users know to put their high fixed costs in @StartBundle and then it is
>>> the runner's job to put as many calls to @ProcessElement as possible to
>>> amortize.
>>>
>>> Kenn
>>>
>>> On Fri, Sep 22, 2023 at 9:39 AM Joey Tran 
>>> wrote:
>>>
 Whoops, I typoed my last email. I meant to write "this isn't the
 greatest strategy for high *fixed* cost transforms", e.g. a transform
 that takes 5 minutes to get set up and then maybe a microsecond per input

 I suppose one solution is to move the responsibility for handling this
 kind of situation to the user and expect users to use a bundling transform

Re: Runner Bundling Strategies

2023-09-22 Thread Jan Lukavský


On 9/22/23 18:07, Robert Bradshaw via dev wrote:
On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev 
 wrote:


I've actually wondered about this specifically for streaming... if
you're writing a pipeline there it seems like you're often going
to want to put high fixed cost things like database connections
even outside of the bundle setup. You really only want to do that
once in the lifetime of the worker itself, not the bundle. Seems
like having that boundary be somewhere other than an arbitrarily
(and probably small in streaming to avoid latency) group of
elements might be more useful? I suppose this depends heavily on
the object lifecycle in the sdk worker though.


+1. This is the difference between @Setup and @StartBundle. The 
start/finish bundle operations should be used for bracketing element 
processing that must be committed as a unit for 
correct failure recovery (e.g. if elements are cached in 
ProcessElement, they should all be emitted in FinishBundle). On the 
other hand, things like open database connections can and likely 
should be shared across bundles.
This is correct, but the caching between @StartBundle and @FinishBundle 
has some problems. First, users need to manually set watermark hold for 
min(timestamp in bundle), otherwise watermark might overtake the 
buffered elements. Users don't have other option than using 
timer.withOutputTimestamp for that, as we don't have a user-facing API 
to set watermark hold otherwise, thus the in-bundle caching implies 
stateful DoFn. The question might then by, why not use "classical" 
stateful caching involving state, as there is full control over the 
caching in user code. This triggered me an idea if it would be useful to 
add the information about caching to the API (e.g. in Java 
@StartBundle(caching=true)), which could solve the above issues maybe 
(runner would know to set the hold, it could work with "stateless" DoFns)?



Best,
B

On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles 
wrote:

(I notice that you replied only to yourself, but there has
been a whole thread of discussion on this - are you subscribed
to dev@beam?
https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)

It sounds like you want what everyone wants: to have the
biggest bundles possible.

So for bounded data, basically you make even splits of the
data and each split is one bundle. And then dynamic splitting
to redistribute work to eliminate stragglers, if your engine
has that capability.

For unbounded data, you more-or-less bundle as much as you can
without waiting too long, like Jan described.

Users know to put their high fixed costs in @StartBundle and
then it is the runner's job to put as many calls
to @ProcessElement as possible to amortize.

Kenn

On Fri, Sep 22, 2023 at 9:39 AM Joey Tran
 wrote:

Whoops, I typoed my last email. I meant to write "this
isn't the greatest strategy for high *fixed* cost
transforms", e.g. a transform that takes 5 minutes to get
set up and then maybe a microsecond per input

I suppose one solution is to move the responsibility for
handling this kind of situation to the user and expect
users to use a bundling transform (e.g. BatchElements [1])
followed by a Reshuffle+FlatMap. Is this what other
runners expect? Just want to make sure I'm not missing
some smart generic bundling strategy that might handle
this for users.

[1]

https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements



On Thu, Sep 21, 2023 at 7:23 PM Joey Tran
 wrote:

Writing a runner and the first strategy for
determining bundling size was to just start with a
bundle size of one and double it until we reach a size
that we expect to take some targets per-bundle runtime
(e.g. maybe 10 minutes). I realize that this isn't the
greatest strategy for high sized cost transforms. I'm
curious what kind of strategies other runners take?


Re: Runner Bundling Strategies

2023-09-22 Thread Robert Bradshaw via dev
On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev 
wrote:

> I've actually wondered about this specifically for streaming... if you're
> writing a pipeline there it seems like you're often going to want to put
> high fixed cost things like database connections even outside of the bundle
> setup. You really only want to do that once in the lifetime of the worker
> itself, not the bundle. Seems like having that boundary be somewhere other
> than an arbitrarily (and probably small in streaming to avoid latency)
> group of elements might be more useful? I suppose this depends heavily on
> the object lifecycle in the sdk worker though.
>

+1. This is the difference between @Setup and @StartBundle. The
start/finish bundle operations should be used for bracketing element
processing that must be committed as a unit for correct failure recovery
(e.g. if elements are cached in ProcessElement, they should all be emitted
in FinishBundle). On the other hand, things like open database connections
can and likely should be shared across bundles.


>
> Best,
> B
>
> On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles  wrote:
>
>> (I notice that you replied only to yourself, but there has been a whole
>> thread of discussion on this - are you subscribed to dev@beam?
>> https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)
>>
>> It sounds like you want what everyone wants: to have the biggest bundles
>> possible.
>>
>> So for bounded data, basically you make even splits of the data and each
>> split is one bundle. And then dynamic splitting to redistribute work to
>> eliminate stragglers, if your engine has that capability.
>>
>> For unbounded data, you more-or-less bundle as much as you can without
>> waiting too long, like Jan described.
>>
>> Users know to put their high fixed costs in @StartBundle and then it is
>> the runner's job to put as many calls to @ProcessElement as possible to
>> amortize.
>>
>> Kenn
>>
>> On Fri, Sep 22, 2023 at 9:39 AM Joey Tran 
>> wrote:
>>
>>> Whoops, I typoed my last email. I meant to write "this isn't the
>>> greatest strategy for high *fixed* cost transforms", e.g. a transform
>>> that takes 5 minutes to get set up and then maybe a microsecond per input
>>>
>>> I suppose one solution is to move the responsibility for handling this
>>> kind of situation to the user and expect users to use a bundling transform
>>> (e.g. BatchElements [1]) followed by a Reshuffle+FlatMap. Is this what
>>> other runners expect? Just want to make sure I'm not missing some smart
>>> generic bundling strategy that might handle this for users.
>>>
>>> [1]
>>> https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
>>>
>>>
>>> On Thu, Sep 21, 2023 at 7:23 PM Joey Tran 
>>> wrote:
>>>
 Writing a runner and the first strategy for determining bundling size
 was to just start with a bundle size of one and double it until we reach a
 size that we expect to take some targets per-bundle runtime (e.g. maybe 10
 minutes). I realize that this isn't the greatest strategy for high sized
 cost transforms. I'm curious what kind of strategies other runners take?

>>>


Re: Runner Bundling Strategies

2023-09-22 Thread Jan Lukavský
Flink operators are long-running classes with life-cycle of open() and 
close(), so any amortization can be done between those methods, see [1]. 
Essentially, it could be viewed that in vanilla Flink the complete 
(unbounded) input is single "bundle". The crucial point is that state is 
checkpointed while this "bundle" is open.


 Jan

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/internals/task_lifecycle/


On 9/22/23 15:21, Kenneth Knowles wrote:
What is the best way to amortize heavy operations across elements in 
Flink? (that is what bundles are for, basically)


On Fri, Sep 22, 2023 at 5:09 AM Jan Lukavský  wrote:

Flink defines bundles in terms of number of elements and
processing time, by default 1000 elements or 1000 milliseconds,
whatever happens first. But bundles are not a "natural" concept in
Flink, it uses them merely to comply with the Beam model. By
default, checkpoints are unaligned with bundles.

 Jan

On 9/22/23 01:58, Robert Bradshaw via dev wrote:

Dataflow uses a work-stealing protocol. The FnAPI has a protocol
to ask the worker to stop at a certain element that has already
been sent.

On Thu, Sep 21, 2023 at 4:24 PM Joey Tran
 wrote:

Writing a runner and the first strategy for determining
bundling size was to just start with a bundle size of one and
double it until we reach a size that we expect to take some
targets per-bundle runtime (e.g. maybe 10 minutes). I realize
that this isn't the greatest strategy for high sized cost
transforms. I'm curious what kind of strategies other runners
take?


Re: Runner Bundling Strategies

2023-09-22 Thread Joey Tran
Ah! Thanks for that catch. I had subscribed to the user mailing list but
forgot to ever sub to the dev list

On Fri, Sep 22, 2023 at 10:03 AM Kenneth Knowles  wrote:

> (I notice that you replied only to yourself, but there has been a whole
> thread of discussion on this - are you subscribed to dev@beam?
> https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)
>
> It sounds like you want what everyone wants: to have the biggest bundles
> possible.
>
> So for bounded data, basically you make even splits of the data and each
> split is one bundle. And then dynamic splitting to redistribute work to
> eliminate stragglers, if your engine has that capability.
>
> For unbounded data, you more-or-less bundle as much as you can without
> waiting too long, like Jan described.
>
> Users know to put their high fixed costs in @StartBundle and then it is
> the runner's job to put as many calls to @ProcessElement as possible to
> amortize.
>
> Kenn
>
> On Fri, Sep 22, 2023 at 9:39 AM Joey Tran 
> wrote:
>
>> Whoops, I typoed my last email. I meant to write "this isn't the
>> greatest strategy for high *fixed* cost transforms", e.g. a transform
>> that takes 5 minutes to get set up and then maybe a microsecond per input
>>
>> I suppose one solution is to move the responsibility for handling this
>> kind of situation to the user and expect users to use a bundling transform
>> (e.g. BatchElements [1]) followed by a Reshuffle+FlatMap. Is this what
>> other runners expect? Just want to make sure I'm not missing some smart
>> generic bundling strategy that might handle this for users.
>>
>> [1]
>> https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
>>
>>
>> On Thu, Sep 21, 2023 at 7:23 PM Joey Tran 
>> wrote:
>>
>>> Writing a runner and the first strategy for determining bundling size
>>> was to just start with a bundle size of one and double it until we reach a
>>> size that we expect to take some targets per-bundle runtime (e.g. maybe 10
>>> minutes). I realize that this isn't the greatest strategy for high sized
>>> cost transforms. I'm curious what kind of strategies other runners take?
>>>
>>


Re: Runner Bundling Strategies

2023-09-22 Thread Byron Ellis via dev
I've actually wondered about this specifically for streaming... if you're
writing a pipeline there it seems like you're often going to want to put
high fixed cost things like database connections even outside of the bundle
setup. You really only want to do that once in the lifetime of the worker
itself, not the bundle. Seems like having that boundary be somewhere other
than an arbitrarily (and probably small in streaming to avoid latency)
group of elements might be more useful? I suppose this depends heavily on
the object lifecycle in the sdk worker though.

Best,
B

On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles  wrote:

> (I notice that you replied only to yourself, but there has been a whole
> thread of discussion on this - are you subscribed to dev@beam?
> https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)
>
> It sounds like you want what everyone wants: to have the biggest bundles
> possible.
>
> So for bounded data, basically you make even splits of the data and each
> split is one bundle. And then dynamic splitting to redistribute work to
> eliminate stragglers, if your engine has that capability.
>
> For unbounded data, you more-or-less bundle as much as you can without
> waiting too long, like Jan described.
>
> Users know to put their high fixed costs in @StartBundle and then it is
> the runner's job to put as many calls to @ProcessElement as possible to
> amortize.
>
> Kenn
>
> On Fri, Sep 22, 2023 at 9:39 AM Joey Tran 
> wrote:
>
>> Whoops, I typoed my last email. I meant to write "this isn't the
>> greatest strategy for high *fixed* cost transforms", e.g. a transform
>> that takes 5 minutes to get set up and then maybe a microsecond per input
>>
>> I suppose one solution is to move the responsibility for handling this
>> kind of situation to the user and expect users to use a bundling transform
>> (e.g. BatchElements [1]) followed by a Reshuffle+FlatMap. Is this what
>> other runners expect? Just want to make sure I'm not missing some smart
>> generic bundling strategy that might handle this for users.
>>
>> [1]
>> https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
>>
>>
>> On Thu, Sep 21, 2023 at 7:23 PM Joey Tran 
>> wrote:
>>
>>> Writing a runner and the first strategy for determining bundling size
>>> was to just start with a bundle size of one and double it until we reach a
>>> size that we expect to take some targets per-bundle runtime (e.g. maybe 10
>>> minutes). I realize that this isn't the greatest strategy for high sized
>>> cost transforms. I'm curious what kind of strategies other runners take?
>>>
>>


Re: Runner Bundling Strategies

2023-09-22 Thread Kenneth Knowles
(I notice that you replied only to yourself, but there has been a whole
thread of discussion on this - are you subscribed to dev@beam?
https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)

It sounds like you want what everyone wants: to have the biggest bundles
possible.

So for bounded data, basically you make even splits of the data and each
split is one bundle. And then dynamic splitting to redistribute work to
eliminate stragglers, if your engine has that capability.

For unbounded data, you more-or-less bundle as much as you can without
waiting too long, like Jan described.

Users know to put their high fixed costs in @StartBundle and then it is the
runner's job to put as many calls to @ProcessElement as possible to
amortize.

Kenn

On Fri, Sep 22, 2023 at 9:39 AM Joey Tran  wrote:

> Whoops, I typoed my last email. I meant to write "this isn't the
> greatest strategy for high *fixed* cost transforms", e.g. a transform
> that takes 5 minutes to get set up and then maybe a microsecond per input
>
> I suppose one solution is to move the responsibility for handling this
> kind of situation to the user and expect users to use a bundling transform
> (e.g. BatchElements [1]) followed by a Reshuffle+FlatMap. Is this what
> other runners expect? Just want to make sure I'm not missing some smart
> generic bundling strategy that might handle this for users.
>
> [1]
> https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
>
>
> On Thu, Sep 21, 2023 at 7:23 PM Joey Tran 
> wrote:
>
>> Writing a runner and the first strategy for determining bundling size was
>> to just start with a bundle size of one and double it until we reach a size
>> that we expect to take some targets per-bundle runtime (e.g. maybe 10
>> minutes). I realize that this isn't the greatest strategy for high sized
>> cost transforms. I'm curious what kind of strategies other runners take?
>>
>


Re: Runner Bundling Strategies

2023-09-22 Thread Joey Tran
Whoops, I typoed my last email. I meant to write "this isn't the
greatest strategy for high *fixed* cost transforms", e.g. a transform that
takes 5 minutes to get set up and then maybe a microsecond per input

I suppose one solution is to move the responsibility for handling this kind
of situation to the user and expect users to use a bundling transform (e.g.
BatchElements [1]) followed by a Reshuffle+FlatMap. Is this what other
runners expect? Just want to make sure I'm not missing some smart generic
bundling strategy that might handle this for users.

[1]
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements


On Thu, Sep 21, 2023 at 7:23 PM Joey Tran  wrote:

> Writing a runner and the first strategy for determining bundling size was
> to just start with a bundle size of one and double it until we reach a size
> that we expect to take some targets per-bundle runtime (e.g. maybe 10
> minutes). I realize that this isn't the greatest strategy for high sized
> cost transforms. I'm curious what kind of strategies other runners take?
>


Re: Runner Bundling Strategies

2023-09-22 Thread Kenneth Knowles
What is the best way to amortize heavy operations across elements in Flink?
(that is what bundles are for, basically)

On Fri, Sep 22, 2023 at 5:09 AM Jan Lukavský  wrote:

> Flink defines bundles in terms of number of elements and processing time,
> by default 1000 elements or 1000 milliseconds, whatever happens first. But
> bundles are not a "natural" concept in Flink, it uses them merely to comply
> with the Beam model. By default, checkpoints are unaligned with bundles.
>
>  Jan
> On 9/22/23 01:58, Robert Bradshaw via dev wrote:
>
> Dataflow uses a work-stealing protocol. The FnAPI has a protocol to ask
> the worker to stop at a certain element that has already been sent.
>
> On Thu, Sep 21, 2023 at 4:24 PM Joey Tran 
> wrote:
>
>> Writing a runner and the first strategy for determining bundling size was
>> to just start with a bundle size of one and double it until we reach a size
>> that we expect to take some targets per-bundle runtime (e.g. maybe 10
>> minutes). I realize that this isn't the greatest strategy for high sized
>> cost transforms. I'm curious what kind of strategies other runners take?
>>
>


Re: Runner Bundling Strategies

2023-09-22 Thread Jan Lukavský
Flink defines bundles in terms of number of elements and processing 
time, by default 1000 elements or 1000 milliseconds, whatever happens 
first. But bundles are not a "natural" concept in Flink, it uses them 
merely to comply with the Beam model. By default, checkpoints are 
unaligned with bundles.


 Jan

On 9/22/23 01:58, Robert Bradshaw via dev wrote:
Dataflow uses a work-stealing protocol. The FnAPI has a protocol to 
ask the worker to stop at a certain element that has already been sent.


On Thu, Sep 21, 2023 at 4:24 PM Joey Tran  
wrote:


Writing a runner and the first strategy for determining bundling
size was to just start with a bundle size of one and double it
until we reach a size that we expect to take some targets
per-bundle runtime (e.g. maybe 10 minutes). I realize that this
isn't the greatest strategy for high sized cost transforms. I'm
curious what kind of strategies other runners take?


Re: Runner Bundling Strategies

2023-09-21 Thread Robert Bradshaw via dev
Dataflow uses a work-stealing protocol. The FnAPI has a protocol to ask the
worker to stop at a certain element that has already been sent.

On Thu, Sep 21, 2023 at 4:24 PM Joey Tran  wrote:

> Writing a runner and the first strategy for determining bundling size was
> to just start with a bundle size of one and double it until we reach a size
> that we expect to take some targets per-bundle runtime (e.g. maybe 10
> minutes). I realize that this isn't the greatest strategy for high sized
> cost transforms. I'm curious what kind of strategies other runners take?
>


Re: Runner Bundling Strategies

2023-09-21 Thread Robert Burke
Depends entirely on the use case really.

Currently for the Prism runner I'm working on for the Go SDK is "bundles
are the size of ready data", which will do OK for having lower latency for
downstream transforms.  It will also tell the SDK to split bundles if an
element takes longer than 200milliseconds to process.

Dataflow Batch jobs will generally start with extremely large bundle sizes
and then use channel splitting and Sub Element splitting to divide work
further than the initial splits. This is basically the opposite strategy
your initial strategy takes

Dataflow streaming tends to do hundreds of single elements bundles per
worker to reduce processing latency.

I can't speak to the Flink and Spark strategies.

Robert Burke
Beam Go Busybody

On Thu, Sep 21, 2023, 4:24 PM Joey Tran  wrote:

> Writing a runner and the first strategy for determining bundling size was
> to just start with a bundle size of one and double it until we reach a size
> that we expect to take some targets per-bundle runtime (e.g. maybe 10
> minutes). I realize that this isn't the greatest strategy for high sized
> cost transforms. I'm curious what kind of strategies other runners take?
>