Re: Runner Bundling Strategies

2023-09-27 Thread Kenneth Knowles
On Wed, Sep 27, 2023 at 2:53 PM Robert Bradshaw via dev wrote: > On Wed, Sep 27, 2023 at 10:58 AM Reuven Lax via dev > wrote: > >> DoFns are allowed to be non deterministic, so they don't have to yield >> the "same" output. >> > > Yeah. I'm more thinking here that there's a set of outputs that

Re: Runner Bundling Strategies

2023-09-27 Thread Robert Bradshaw via dev
On Wed, Sep 27, 2023 at 10:58 AM Reuven Lax via dev wrote: > DoFns are allowed to be non deterministic, so they don't have to yield the > "same" output. > Yeah. I'm more thinking here that there's a set of outputs that are considered equivalently valid. > The example I'm thinking of is where

Re: Runner Bundling Strategies

2023-09-27 Thread Jan Lukavský
Understood, thanks. This is fairly unintuitive from the "checkpoint barrier" viewpoint, because when such runner fails, it simply restarts from the checkpoint as it would be a fresh start - i.e. calling Setup. It makes sense that a bundle-based runner might not do that. It seems to follow

Re: Runner Bundling Strategies

2023-09-27 Thread Reuven Lax via dev
Using Setup would cause data loss in this case. A runner can always retry a bundle, and I don't believe Setup is called again in this case. If the user initiated the hashmap in setup, this would cause records to be completely lost whenever bundles retry. On Wed, Sep 27, 2023 at 11:20 AM Jan

Re: Runner Bundling Strategies

2023-09-27 Thread Jan Lukavský
What is the reason to rely on StartBundle and not Setup in this case? If the life-cycle of bundle is not "closed" (i.e. start - finish), then it seems to be ill defined and Setup should do? I'm trying to think of non-caching use-cases of StartBundle-FinishBundle, are there such cases? I'd say

Re: Runner Bundling Strategies

2023-09-27 Thread Reuven Lax via dev
DoFns are allowed to be non deterministic, so they don't have to yield the "same" output. The example I'm thinking of is where users perform some "best-effort" deduplication by creating a hashmap in StartBundle and removing duplicates. This is usually done purely for performance to reduce shuffle

Re: Runner Bundling Strategies

2023-09-26 Thread Robert Bradshaw via dev
On Tue, Sep 26, 2023 at 10:33 AM Reuven Lax via dev wrote: > Yes, not including FinishBundle in ParDoPayload seems like a mistake. > No reason we couldn't rectify this now. (We'd need to do it in such a way that the absence of such an annotation isn't inferred as an incorrect value of course.)

Re: Runner Bundling Strategies

2023-09-26 Thread Kenneth Knowles
On Tue, Sep 26, 2023 at 1:33 PM Reuven Lax via dev wrote: > Yes, not including FinishBundle in ParDoPayload seems like a mistake. > Though absence of FinishBundle doesn't mean that one can assume that > elements in a bundle don't affect subsequent bundle elements (i.e. there > might still be

Re: Runner Bundling Strategies

2023-09-26 Thread Reuven Lax via dev
Yes, not including FinishBundle in ParDoPayload seems like a mistake. Though absence of FinishBundle doesn't mean that one can assume that elements in a bundle don't affect subsequent bundle elements (i.e. there might still be caching!) On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles wrote: > >

Re: Runner Bundling Strategies

2023-09-26 Thread Robert Burke
Oh neat, Preserving Keys. I didn't think we had/provided a mechanism for declaring that. Good doc. I do know there's no annotation for FinishBundle. It's generally optional and SDK side only as a concern. Finalize Bundle is a different mechanism which does have an annotation, and requires both

Re: Runner Bundling Strategies

2023-09-26 Thread Kenneth Knowles
On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský wrote: > Hi Kenn and Reuven, > > I agree with all these points. The only issue here seems to be that > FlinkRunner does not fulfill these constraints. This is a bug that can be > fixed, though we need to change some defaults, as 1000 ms default bundle

Re: Runner Bundling Strategies

2023-09-25 Thread Jan Lukavský
Hi Kenn and Reuven, I agree with all these points. The only issue here seems to be that FlinkRunner does not fulfill these constraints. This is a bug that can be fixed, though we need to change some defaults, as 1000 ms default bundle "duration" for lower traffic Pipelines can be too much. We

Re: Runner Bundling Strategies

2023-09-25 Thread Reuven Lax via dev
On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský wrote: > > On 9/23/23 18:16, Reuven Lax via dev wrote: > > Two separate things here: > > 1. Yes, a watermark can update in the middle of a bundle. > 2. The records in the bundle themselves will prevent the watermark from > updating as they are still

Re: Runner Bundling Strategies

2023-09-25 Thread Kenneth Knowles
These are some good points. Replies inline. On Mon, Sep 25, 2023 at 9:19 AM Jan Lukavský wrote: > > On 9/23/23 18:16, Reuven Lax via dev wrote: > > Two separate things here: > > 1. Yes, a watermark can update in the middle of a bundle. > 2. The records in the bundle themselves will prevent the

Re: Runner Bundling Strategies

2023-09-25 Thread Jan Lukavský
On 9/23/23 18:16, Reuven Lax via dev wrote: Two separate things here: 1. Yes, a watermark can update in the middle of a bundle. 2. The records in the bundle themselves will prevent the watermark from updating as they are still in flight until after finish bundle. Therefore simply caching the

Re: Runner Bundling Strategies

2023-09-23 Thread Reuven Lax via dev
Two separate things here: 1. Yes, a watermark can update in the middle of a bundle. 2. The records in the bundle themselves will prevent the watermark from updating as they are still in flight until after finish bundle. Therefore simply caching the records should always be watermark safe,

Re: Runner Bundling Strategies

2023-09-23 Thread Jan Lukavský
> Watermarks shouldn't be (visibly) advanced until @FinishBundle is committed, as there's no guarantee that this work won't be discarded. There was a thread [1], where the conclusion seemed to be that updating watermark is possible even in the middle of a bundle. Actually, handling watermarks

Re: Runner Bundling Strategies

2023-09-22 Thread Robert Bradshaw via dev
On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský wrote: > On 9/22/23 18:07, Robert Bradshaw via dev wrote: > > On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev > wrote: > >> I've actually wondered about this specifically for streaming... if you're >> writing a pipeline there it seems like

Re: Runner Bundling Strategies

2023-09-22 Thread Jan Lukavský
On 9/22/23 18:07, Robert Bradshaw via dev wrote: On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev wrote: I've actually wondered about this specifically for streaming... if you're writing a pipeline there it seems like you're often going to want to put high fixed cost things

Re: Runner Bundling Strategies

2023-09-22 Thread Robert Bradshaw via dev
On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev wrote: > I've actually wondered about this specifically for streaming... if you're > writing a pipeline there it seems like you're often going to want to put > high fixed cost things like database connections even outside of the bundle > setup.

Re: Runner Bundling Strategies

2023-09-22 Thread Jan Lukavský
Flink operators are long-running classes with life-cycle of open() and close(), so any amortization can be done between those methods, see [1]. Essentially, it could be viewed that in vanilla Flink the complete (unbounded) input is single "bundle". The crucial point is that state is

Re: Runner Bundling Strategies

2023-09-22 Thread Joey Tran
Ah! Thanks for that catch. I had subscribed to the user mailing list but forgot to ever sub to the dev list On Fri, Sep 22, 2023 at 10:03 AM Kenneth Knowles wrote: > (I notice that you replied only to yourself, but there has been a whole > thread of discussion on this - are you subscribed to

Re: Runner Bundling Strategies

2023-09-22 Thread Byron Ellis via dev
I've actually wondered about this specifically for streaming... if you're writing a pipeline there it seems like you're often going to want to put high fixed cost things like database connections even outside of the bundle setup. You really only want to do that once in the lifetime of the worker

Re: Runner Bundling Strategies

2023-09-22 Thread Kenneth Knowles
(I notice that you replied only to yourself, but there has been a whole thread of discussion on this - are you subscribed to dev@beam? https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd) It sounds like you want what everyone wants: to have the biggest bundles possible. So for

Re: Runner Bundling Strategies

2023-09-22 Thread Joey Tran
Whoops, I typoed my last email. I meant to write "this isn't the greatest strategy for high *fixed* cost transforms", e.g. a transform that takes 5 minutes to get set up and then maybe a microsecond per input I suppose one solution is to move the responsibility for handling this kind of situation

Re: Runner Bundling Strategies

2023-09-22 Thread Kenneth Knowles
What is the best way to amortize heavy operations across elements in Flink? (that is what bundles are for, basically) On Fri, Sep 22, 2023 at 5:09 AM Jan Lukavský wrote: > Flink defines bundles in terms of number of elements and processing time, > by default 1000 elements or 1000 milliseconds,

Re: Runner Bundling Strategies

2023-09-22 Thread Jan Lukavský
Flink defines bundles in terms of number of elements and processing time, by default 1000 elements or 1000 milliseconds, whatever happens first. But bundles are not a "natural" concept in Flink, it uses them merely to comply with the Beam model. By default, checkpoints are unaligned with

Re: Runner Bundling Strategies

2023-09-21 Thread Robert Bradshaw via dev
Dataflow uses a work-stealing protocol. The FnAPI has a protocol to ask the worker to stop at a certain element that has already been sent. On Thu, Sep 21, 2023 at 4:24 PM Joey Tran wrote: > Writing a runner and the first strategy for determining bundling size was > to just start with a bundle

Re: Runner Bundling Strategies

2023-09-21 Thread Robert Burke
Depends entirely on the use case really. Currently for the Prism runner I'm working on for the Go SDK is "bundles are the size of ready data", which will do OK for having lower latency for downstream transforms. It will also tell the SDK to split bundles if an element takes longer than