On Wed, Sep 27, 2023 at 2:53 PM Robert Bradshaw via dev
wrote:
> On Wed, Sep 27, 2023 at 10:58 AM Reuven Lax via dev
> wrote:
>
>> DoFns are allowed to be non deterministic, so they don't have to yield
>> the "same" output.
>>
>
> Yeah. I'm more thinking here that there's a set of outputs that
On Wed, Sep 27, 2023 at 10:58 AM Reuven Lax via dev
wrote:
> DoFns are allowed to be non deterministic, so they don't have to yield the
> "same" output.
>
Yeah. I'm more thinking here that there's a set of outputs that are
considered equivalently valid.
> The example I'm thinking of is where
Understood, thanks. This is fairly unintuitive from the "checkpoint
barrier" viewpoint, because when such runner fails, it simply restarts
from the checkpoint as it would be a fresh start - i.e. calling Setup.
It makes sense that a bundle-based runner might not do that.
It seems to follow
Using Setup would cause data loss in this case. A runner can always retry a
bundle, and I don't believe Setup is called again in this case. If the user
initiated the hashmap in setup, this would cause records to be completely
lost whenever bundles retry.
On Wed, Sep 27, 2023 at 11:20 AM Jan
What is the reason to rely on StartBundle and not Setup in this case? If
the life-cycle of bundle is not "closed" (i.e. start - finish), then it
seems to be ill defined and Setup should do?
I'm trying to think of non-caching use-cases of
StartBundle-FinishBundle, are there such cases? I'd say
DoFns are allowed to be non deterministic, so they don't have to yield the
"same" output.
The example I'm thinking of is where users perform some "best-effort"
deduplication by creating a hashmap in StartBundle and removing duplicates.
This is usually done purely for performance to reduce shuffle
On Tue, Sep 26, 2023 at 10:33 AM Reuven Lax via dev
wrote:
> Yes, not including FinishBundle in ParDoPayload seems like a mistake.
>
No reason we couldn't rectify this now. (We'd need to do it in such a way
that the absence of such an annotation isn't inferred as an incorrect value
of course.)
On Tue, Sep 26, 2023 at 1:33 PM Reuven Lax via dev
wrote:
> Yes, not including FinishBundle in ParDoPayload seems like a mistake.
> Though absence of FinishBundle doesn't mean that one can assume that
> elements in a bundle don't affect subsequent bundle elements (i.e. there
> might still be
Yes, not including FinishBundle in ParDoPayload seems like a mistake.
Though absence of FinishBundle doesn't mean that one can assume that
elements in a bundle don't affect subsequent bundle elements (i.e. there
might still be caching!)
On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles wrote:
>
>
Oh neat, Preserving Keys. I didn't think we had/provided a mechanism for
declaring that.
Good doc.
I do know there's no annotation for FinishBundle. It's generally optional
and SDK side only as a concern.
Finalize Bundle is a different mechanism which does have an annotation, and
requires both
On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský wrote:
> Hi Kenn and Reuven,
>
> I agree with all these points. The only issue here seems to be that
> FlinkRunner does not fulfill these constraints. This is a bug that can be
> fixed, though we need to change some defaults, as 1000 ms default bundle
Hi Kenn and Reuven,
I agree with all these points. The only issue here seems to be that
FlinkRunner does not fulfill these constraints. This is a bug that can
be fixed, though we need to change some defaults, as 1000 ms default
bundle "duration" for lower traffic Pipelines can be too much. We
On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský wrote:
>
> On 9/23/23 18:16, Reuven Lax via dev wrote:
>
> Two separate things here:
>
> 1. Yes, a watermark can update in the middle of a bundle.
> 2. The records in the bundle themselves will prevent the watermark from
> updating as they are still
These are some good points. Replies inline.
On Mon, Sep 25, 2023 at 9:19 AM Jan Lukavský wrote:
>
> On 9/23/23 18:16, Reuven Lax via dev wrote:
>
> Two separate things here:
>
> 1. Yes, a watermark can update in the middle of a bundle.
> 2. The records in the bundle themselves will prevent the
On 9/23/23 18:16, Reuven Lax via dev wrote:
Two separate things here:
1. Yes, a watermark can update in the middle of a bundle.
2. The records in the bundle themselves will prevent the watermark
from updating as they are still in flight until after finish bundle.
Therefore simply caching the
Two separate things here:
1. Yes, a watermark can update in the middle of a bundle.
2. The records in the bundle themselves will prevent the watermark from
updating as they are still in flight until after finish bundle. Therefore
simply caching the records should always be watermark safe,
> Watermarks shouldn't be (visibly) advanced until @FinishBundle is
committed, as there's no guarantee that this work won't be discarded.
There was a thread [1], where the conclusion seemed to be that updating
watermark is possible even in the middle of a bundle. Actually, handling
watermarks
On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský wrote:
> On 9/22/23 18:07, Robert Bradshaw via dev wrote:
>
> On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev
> wrote:
>
>> I've actually wondered about this specifically for streaming... if you're
>> writing a pipeline there it seems like
On 9/22/23 18:07, Robert Bradshaw via dev wrote:
On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev
wrote:
I've actually wondered about this specifically for streaming... if
you're writing a pipeline there it seems like you're often going
to want to put high fixed cost things
On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev
wrote:
> I've actually wondered about this specifically for streaming... if you're
> writing a pipeline there it seems like you're often going to want to put
> high fixed cost things like database connections even outside of the bundle
> setup.
Flink operators are long-running classes with life-cycle of open() and
close(), so any amortization can be done between those methods, see [1].
Essentially, it could be viewed that in vanilla Flink the complete
(unbounded) input is single "bundle". The crucial point is that state is
Ah! Thanks for that catch. I had subscribed to the user mailing list but
forgot to ever sub to the dev list
On Fri, Sep 22, 2023 at 10:03 AM Kenneth Knowles wrote:
> (I notice that you replied only to yourself, but there has been a whole
> thread of discussion on this - are you subscribed to
I've actually wondered about this specifically for streaming... if you're
writing a pipeline there it seems like you're often going to want to put
high fixed cost things like database connections even outside of the bundle
setup. You really only want to do that once in the lifetime of the worker
(I notice that you replied only to yourself, but there has been a whole
thread of discussion on this - are you subscribed to dev@beam?
https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)
It sounds like you want what everyone wants: to have the biggest bundles
possible.
So for
Whoops, I typoed my last email. I meant to write "this isn't the
greatest strategy for high *fixed* cost transforms", e.g. a transform that
takes 5 minutes to get set up and then maybe a microsecond per input
I suppose one solution is to move the responsibility for handling this kind
of situation
What is the best way to amortize heavy operations across elements in Flink?
(that is what bundles are for, basically)
On Fri, Sep 22, 2023 at 5:09 AM Jan Lukavský wrote:
> Flink defines bundles in terms of number of elements and processing time,
> by default 1000 elements or 1000 milliseconds,
Flink defines bundles in terms of number of elements and processing
time, by default 1000 elements or 1000 milliseconds, whatever happens
first. But bundles are not a "natural" concept in Flink, it uses them
merely to comply with the Beam model. By default, checkpoints are
unaligned with
Dataflow uses a work-stealing protocol. The FnAPI has a protocol to ask the
worker to stop at a certain element that has already been sent.
On Thu, Sep 21, 2023 at 4:24 PM Joey Tran wrote:
> Writing a runner and the first strategy for determining bundling size was
> to just start with a bundle
Depends entirely on the use case really.
Currently for the Prism runner I'm working on for the Go SDK is "bundles
are the size of ready data", which will do OK for having lower latency for
downstream transforms. It will also tell the SDK to split bundles if an
element takes longer than
29 matches
Mail list logo