Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-29 Thread Jan Lukavský
Going a little further, instead of CombneFn in (b), we might try to 
solve the problem of incorporating iterations into the model. Iterations 
(backloops) working without event-timers (i.e. processing time tmers 
only or no timers at all) should not interfere with watermarks and 
therefore would not create the problem of "vector watermarks". The 
Throttle transform would then use the backling for feedback loop to 
slowdown the request rate.


On 2/29/24 14:57, Jan Lukavský wrote:
From my understanding Flink rate limits based on local information 
only. On the other hand - in case of Flink - this should easily extend 
to global information, because the parallelism for both batch and 
streaming is set before job is launched and remains unchanged (until 
possible manual rescaling). There is a possibility of adaptive 
scheduling [1] which would then probably require communication of the 
parallelism to workers (I'd guess this is not implemented).


Regarding the other points - I'd be in favor of the following:

 a) batch timers - trying to extend the current definition of 
processing time timers to batch without introduction new primitive, so 
in an extended, backwards compatible way (presumably mostly 
terminating condition?)


 b) we could define a CombineFn that would accumulate data from 
workers and provide accumulated results in defined tumbling windows 
back to workers - this could be reused both Throttle, watermark 
alignment, and probably others


Best,

 Jan

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/



On 2/28/24 19:37, Robert Burke wrote:
Sounds like a different variation is either new timer types with 
those distinctions in mind, or additional configuration for 
ProcessingTime timers (defaulting to current behavior) to sort out 
those cases. Could potentially be extended to EventTime timers too 
for explicitly handling looping timer cases (eg. To signal: This 
DoFn's OnWindowExpiry method manages the consequences of this timer's 
effect of a Drain. Or similar. Or we put that as a additional 
configuration for OnWindowExpiry, along with Drain Awareness...)


I got curious and looked loosely at how Flink solves this problem: 
https://flink.apache.org/2022/11/25/optimising-the-throughput-of-async-sinks-using-a-custom-ratelimitingstrategy/


In short, an explicit rate limiting strategy. The surface glance 
indicates that it relies on local in memory state, but actual use of 
these things seems relegated to abstract classes (eg for Sinks and 
similar). It's not clear to me whether there is cross worker 
coordination happening there, or it's assumed to be all on a single 
machine anyway. I'm unfamiliar with how Flink operates, so I can't say.


I think I'd be happiest if we could build into Beam a mechanism / 
paired primitive where such a Cross Worker Communication Pair (the 
processor/server + DoFn client) could be built, but not purely be 
limited to Rate limiting/Throttling. Possibly mumble mumble 
StatePipe? But that feels like a harder problem for the time being.


Robert Burke

On 2024/02/28 08:25:35 Jan Lukavský wrote:

On 2/27/24 19:49, Robert Bradshaw via dev wrote:

On Tue, Feb 27, 2024 at 10:39 AM Jan Lukavský  wrote:

On 2/27/24 19:22, Robert Bradshaw via dev wrote:
On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles 
 wrote:

Pulling out focus points:

On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev 
 wrote:
I can't act on something yet [...] but I expect to be able to 
[...] at some time in the processing-time future.
I like this as a clear and internally-consistent feature 
description. It describes ProcessContinuation and those timers 
which serve the same purpose as ProcessContinuation.


On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev 
 wrote:
I can't think of a batch or streaming scenario where it would 
be correct to not wait at least that long
The main reason we created timers: to take action in the absence 
of data. The archetypal use case for processing time timers 
was/is "flush data from state if it has been sitting there too 
long". For this use case, the right behavior for batch is to 
skip the timer. It is actually basically incorrect to wait.
Good point calling out the distinction between "I need to wait in 
case

there's more data." and "I need to wait for something external." We
can't currently distinguish between the two, but a batch runner can
say something definitive about the first. Feels like we need a new
primitive (or at least new signaling information on our existing
primitive).
Runners signal end of data to a DoFn via (input) watermark. Is 
there a

need for additional information?

Yes, and I agree that watermarks/event timestamps are a much better
way to track data completeness (if possible).

Unfortunately processing timers don't specify if they're waiting for
additional data or external/environmental change, meaning we can't use
the (event time) watermark to determine whether they're safe to

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-29 Thread Jan Lukavský
From my understanding Flink rate limits based on local information 
only. On the other hand - in case of Flink - this should easily extend 
to global information, because the parallelism for both batch and 
streaming is set before job is launched and remains unchanged (until 
possible manual rescaling). There is a possibility of adaptive 
scheduling [1] which would then probably require communication of the 
parallelism to workers (I'd guess this is not implemented).


Regarding the other points - I'd be in favor of the following:

 a) batch timers - trying to extend the current definition of 
processing time timers to batch without introduction new primitive, so 
in an extended, backwards compatible way (presumably mostly terminating 
condition?)


 b) we could define a CombineFn that would accumulate data from workers 
and provide accumulated results in defined tumbling windows back to 
workers - this could be reused both Throttle, watermark alignment, and 
probably others


Best,

 Jan

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/



On 2/28/24 19:37, Robert Burke wrote:

Sounds like a different variation is either new timer types with those 
distinctions in mind, or additional configuration for ProcessingTime timers 
(defaulting to current behavior) to sort out those cases. Could potentially be 
extended to EventTime timers too for explicitly handling looping timer cases 
(eg. To signal: This DoFn's OnWindowExpiry method manages the consequences of 
this timer's effect of a Drain. Or similar. Or we put that as a additional 
configuration for OnWindowExpiry, along with Drain Awareness...)

I got curious and looked loosely at how Flink solves this problem:  
https://flink.apache.org/2022/11/25/optimising-the-throughput-of-async-sinks-using-a-custom-ratelimitingstrategy/

In short, an explicit rate limiting strategy. The surface glance indicates that 
it relies on local in memory state, but actual use of these things seems 
relegated to abstract classes (eg for Sinks and similar). It's not clear to me 
whether there is cross worker coordination happening there, or it's assumed to 
be all on a single machine anyway. I'm unfamiliar with how Flink operates, so I 
can't say.

I think I'd be happiest if we could build into Beam a mechanism / paired 
primitive where such a Cross Worker Communication Pair (the processor/server + 
DoFn client) could be built, but not purely be limited to Rate 
limiting/Throttling. Possibly mumble mumble StatePipe? But that feels like a 
harder problem for the time being.

Robert Burke

On 2024/02/28 08:25:35 Jan Lukavský wrote:

On 2/27/24 19:49, Robert Bradshaw via dev wrote:

On Tue, Feb 27, 2024 at 10:39 AM Jan Lukavský  wrote:

On 2/27/24 19:22, Robert Bradshaw via dev wrote:

On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles  wrote:

Pulling out focus points:

On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
wrote:

I can't act on something yet [...] but I expect to be able to [...] at some 
time in the processing-time future.

I like this as a clear and internally-consistent feature description. It 
describes ProcessContinuation and those timers which serve the same purpose as 
ProcessContinuation.

On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
wrote:

I can't think of a batch or streaming scenario where it would be correct to not 
wait at least that long

The main reason we created timers: to take action in the absence of data. The archetypal 
use case for processing time timers was/is "flush data from state if it has been 
sitting there too long". For this use case, the right behavior for batch is to skip 
the timer. It is actually basically incorrect to wait.

Good point calling out the distinction between "I need to wait in case
there's more data." and "I need to wait for something external." We
can't currently distinguish between the two, but a batch runner can
say something definitive about the first. Feels like we need a new
primitive (or at least new signaling information on our existing
primitive).

Runners signal end of data to a DoFn via (input) watermark. Is there a
need for additional information?

Yes, and I agree that watermarks/event timestamps are a much better
way to track data completeness (if possible).

Unfortunately processing timers don't specify if they're waiting for
additional data or external/environmental change, meaning we can't use
the (event time) watermark to determine whether they're safe to
trigger.

+1



Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-28 Thread Robert Burke
Sounds like a different variation is either new timer types with those 
distinctions in mind, or additional configuration for ProcessingTime timers 
(defaulting to current behavior) to sort out those cases. Could potentially be 
extended to EventTime timers too for explicitly handling looping timer cases 
(eg. To signal: This DoFn's OnWindowExpiry method manages the consequences of 
this timer's effect of a Drain. Or similar. Or we put that as a additional 
configuration for OnWindowExpiry, along with Drain Awareness...)

I got curious and looked loosely at how Flink solves this problem:  
https://flink.apache.org/2022/11/25/optimising-the-throughput-of-async-sinks-using-a-custom-ratelimitingstrategy/

In short, an explicit rate limiting strategy. The surface glance indicates that 
it relies on local in memory state, but actual use of these things seems 
relegated to abstract classes (eg for Sinks and similar). It's not clear to me 
whether there is cross worker coordination happening there, or it's assumed to 
be all on a single machine anyway. I'm unfamiliar with how Flink operates, so I 
can't say.

I think I'd be happiest if we could build into Beam a mechanism / paired 
primitive where such a Cross Worker Communication Pair (the processor/server + 
DoFn client) could be built, but not purely be limited to Rate 
limiting/Throttling. Possibly mumble mumble StatePipe? But that feels like a 
harder problem for the time being.

Robert Burke

On 2024/02/28 08:25:35 Jan Lukavský wrote:
> 
> On 2/27/24 19:49, Robert Bradshaw via dev wrote:
> > On Tue, Feb 27, 2024 at 10:39 AM Jan Lukavský  wrote:
> >> On 2/27/24 19:22, Robert Bradshaw via dev wrote:
> >>> On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles  wrote:
>  Pulling out focus points:
> 
>  On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev 
>   wrote:
> > I can't act on something yet [...] but I expect to be able to [...] at 
> > some time in the processing-time future.
>  I like this as a clear and internally-consistent feature description. It 
>  describes ProcessContinuation and those timers which serve the same 
>  purpose as ProcessContinuation.
> 
>  On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev 
>   wrote:
> > I can't think of a batch or streaming scenario where it would be 
> > correct to not wait at least that long
>  The main reason we created timers: to take action in the absence of 
>  data. The archetypal use case for processing time timers was/is "flush 
>  data from state if it has been sitting there too long". For this use 
>  case, the right behavior for batch is to skip the timer. It is actually 
>  basically incorrect to wait.
> >>> Good point calling out the distinction between "I need to wait in case
> >>> there's more data." and "I need to wait for something external." We
> >>> can't currently distinguish between the two, but a batch runner can
> >>> say something definitive about the first. Feels like we need a new
> >>> primitive (or at least new signaling information on our existing
> >>> primitive).
> >> Runners signal end of data to a DoFn via (input) watermark. Is there a
> >> need for additional information?
> > Yes, and I agree that watermarks/event timestamps are a much better
> > way to track data completeness (if possible).
> >
> > Unfortunately processing timers don't specify if they're waiting for
> > additional data or external/environmental change, meaning we can't use
> > the (event time) watermark to determine whether they're safe to
> > trigger.
> +1
> 


Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-28 Thread Jan Lukavský



On 2/27/24 19:49, Robert Bradshaw via dev wrote:

On Tue, Feb 27, 2024 at 10:39 AM Jan Lukavský  wrote:

On 2/27/24 19:22, Robert Bradshaw via dev wrote:

On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles  wrote:

Pulling out focus points:

On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
wrote:

I can't act on something yet [...] but I expect to be able to [...] at some 
time in the processing-time future.

I like this as a clear and internally-consistent feature description. It 
describes ProcessContinuation and those timers which serve the same purpose as 
ProcessContinuation.

On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
wrote:

I can't think of a batch or streaming scenario where it would be correct to not 
wait at least that long

The main reason we created timers: to take action in the absence of data. The archetypal 
use case for processing time timers was/is "flush data from state if it has been 
sitting there too long". For this use case, the right behavior for batch is to skip 
the timer. It is actually basically incorrect to wait.

Good point calling out the distinction between "I need to wait in case
there's more data." and "I need to wait for something external." We
can't currently distinguish between the two, but a batch runner can
say something definitive about the first. Feels like we need a new
primitive (or at least new signaling information on our existing
primitive).

Runners signal end of data to a DoFn via (input) watermark. Is there a
need for additional information?

Yes, and I agree that watermarks/event timestamps are a much better
way to track data completeness (if possible).

Unfortunately processing timers don't specify if they're waiting for
additional data or external/environmental change, meaning we can't use
the (event time) watermark to determine whether they're safe to
trigger.

+1


Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-27 Thread Reuven Lax via dev
On Tue, Feb 27, 2024 at 10:22 AM Robert Bradshaw via dev <
dev@beam.apache.org> wrote:

> On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles  wrote:
> >
> > Pulling out focus points:
> >
> > On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
> > > I can't act on something yet [...] but I expect to be able to [...] at
> some time in the processing-time future.
> >
> > I like this as a clear and internally-consistent feature description. It
> describes ProcessContinuation and those timers which serve the same purpose
> as ProcessContinuation.
> >
> > On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
> > > I can't think of a batch or streaming scenario where it would be
> correct to not wait at least that long
> >
> > The main reason we created timers: to take action in the absence of
> data. The archetypal use case for processing time timers was/is "flush data
> from state if it has been sitting there too long". For this use case, the
> right behavior for batch is to skip the timer. It is actually basically
> incorrect to wait.
>
> Good point calling out the distinction between "I need to wait in case
> there's more data." and "I need to wait for something external." We
> can't currently distinguish between the two, but a batch runner can
> say something definitive about the first. Feels like we need a new
> primitive (or at least new signaling information on our existing
> primitive).
>
> BTW the first is also relevant to drain. One reason drain often takes a
long time today is because it has to wait for processing-time timers to
fire (it has to wait because those timers have watermark holds), but
usually those timers are noops.


> > On Fri, Feb 23, 2024 at 3:54 PM Robert Burke 
> wrote:
> > > It doesn't require a new primitive.
> >
> > IMO what's being proposed *is* a new primitive. I think it is a good
> primitive. It is the underlying primitive to ProcessContinuation. It would
> be user-friendly as a kind of timer. But if we made this the behavior of
> processing time timers retroactively, it would break everyone using them to
> flush data who is also reprocessing data.
> >
> > There's two very different use cases ("I need to wait, and block data"
> vs "I want to act without data, aka NOT wait for data") and I think we
> should serve both of them, but it doesn't have to be with the same
> low-level feature.
> >
> > Kenn
> >
> >
> > On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
> >>
> >> On Fri, Feb 23, 2024 at 3:54 PM Robert Burke 
> wrote:
> >> >
> >> > While I'm currently on the other side of the fence, I would not be
> against changing/requiring the semantics of ProcessingTime constructs to be
> "must wait and execute" as such a solution, and enables the Proposed
> "batch" process continuation throttling mechanism to work as hypothesized
> for both "batch" and "streaming" execution.
> >> >
> >> > There's a lot to like, as it leans Beam further into the unification
> of Batch and Stream, with one fewer exception (eg. unifies timer experience
> further). It doesn't require a new primitive. It probably matches more with
> user expectations anyway.
> >> >
> >> > It does cause looping timer execution with processing time to be a
> problem for Drains however.
> >>
> >> I think we have a problem with looping timers plus drain (a mostly
> >> streaming idea anyway) regardless.
> >>
> >> > I'd argue though that in the case of a drain, we could updated the
> semantics as "move watermark to infinity"  "existing timers are executed,
> but new timers are ignored",
> >>
> >> I don't like the idea of dropping timers for drain. I think correct
> >> handling here requires user visibility into whether a pipeline is
> >> draining or not.
> >>
> >> > and ensure/and update the requirements around OnWindowExpiration
> callbacks to be a bit more insistent on being implemented for correct
> execution, which is currently the only "hard" signal to the SDK side that
> the window's work is guaranteed to be over, and remaining state needs to be
> addressed by the transform or be garbage collected. This remains critical
> for developing a good pattern for ProcessingTime timers within a Global
> Window too.
> >>
> >> +1
> >>
> >> >
> >> > On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote:
> >> > > Thanks for bringing this up.
> >> > >
> >> > > My position is that both batch and streaming should wait for
> >> > > processing time timers, according to local time (with the exception
> of
> >> > > tests that can accelerate this via faked clocks).
> >> > >
> >> > > Both ProcessContinuations delays and ProcessingTimeTimers are IMHO
> >> > > isomorphic, and can be implemented in terms of each other (at least
> in
> >> > > one direction, and likely the other). Both are an indication that I
> >> > > can't act on something yet due to external constraints (e.g. not all
> >> > > the data has been published, or I lack sufficient 

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-27 Thread Robert Bradshaw via dev
On Tue, Feb 27, 2024 at 10:39 AM Jan Lukavský  wrote:
>
> On 2/27/24 19:22, Robert Bradshaw via dev wrote:
> > On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles  wrote:
> >> Pulling out focus points:
> >>
> >> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev 
> >>  wrote:
> >>> I can't act on something yet [...] but I expect to be able to [...] at 
> >>> some time in the processing-time future.
> >> I like this as a clear and internally-consistent feature description. It 
> >> describes ProcessContinuation and those timers which serve the same 
> >> purpose as ProcessContinuation.
> >>
> >> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev 
> >>  wrote:
> >>> I can't think of a batch or streaming scenario where it would be correct 
> >>> to not wait at least that long
> >> The main reason we created timers: to take action in the absence of data. 
> >> The archetypal use case for processing time timers was/is "flush data from 
> >> state if it has been sitting there too long". For this use case, the right 
> >> behavior for batch is to skip the timer. It is actually basically 
> >> incorrect to wait.
> > Good point calling out the distinction between "I need to wait in case
> > there's more data." and "I need to wait for something external." We
> > can't currently distinguish between the two, but a batch runner can
> > say something definitive about the first. Feels like we need a new
> > primitive (or at least new signaling information on our existing
> > primitive).
> Runners signal end of data to a DoFn via (input) watermark. Is there a
> need for additional information?

Yes, and I agree that watermarks/event timestamps are a much better
way to track data completeness (if possible).

Unfortunately processing timers don't specify if they're waiting for
additional data or external/environmental change, meaning we can't use
the (event time) watermark to determine whether they're safe to
trigger.


Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-27 Thread Jan Lukavský

On 2/27/24 19:30, Robert Bradshaw via dev wrote:

On Tue, Feb 27, 2024 at 7:44 AM Robert Burke  wrote:

An "as fast as it can runner" with dynamic splits, would ultimately split to the systems 
maximum available parallelism (for stateful DoFns, this is the number of keys; for SplittableDoFns, 
this is the maximum sharding of each input element's restriction. That's what would happen with a 
"normal" sleep.

WRT Portability, this means adding a current ProcessingTime field to the 
ProcessBundleRequest, and likely also to the ProgressRequest so the runner could 
coordinate. ProgressResponse may then need a "asleepUntil" field to communicate 
back the state of the bundle, which the runner could then use to better time its next 
ProgressRequest, and potentially arrest dynamic splitting for that bundle. After all, the 
sleeping bundle is blocked until processing time has advanced anyway; no progress can be 
made.

I like moving the abstraction out of the timer space, as it better aligns with 
user intent for the throttle case, and it doesn't require a Stateful DoFn to 
operate (orthogonal!), meaning it's useful for It also solves the testing issue 
WRT ProcessingTime timers using an absolute time, rather than a relative time, 
as the SDK can rebuild it's relative setters for output time on the new 
canonical processing time, without user code changing.

The sleeping inprogress bundle naturally holds back the watermark too.

I suspect this mechanism would end up tending to over throttle as Reuven 
described earlier, since the user is only pushing back on immediate processing 
for the current element, not necessarily all elements. This is particularly 
likely if there's a long gap between ProgressRequests for the bundle and the 
runner doesn't adapt it's cadence.

An external source of rate doesn't really exist, other than some external 
source that can provide throttle information. There would remain time skew 
between the runner system and the external system though, but for a throttle 
that's likely fine.

A central notion of ProcessingTime also allows the runner to "smear" processing 
time so if there's a particularly long delay, it doesn't need to catch up at once. I 
don't think that's relevant for the throttle case though, since with the described clock 
mechanism and the communication back to the runner, the unblocking notion is probably 
fine.

On this note, I have become skeptical that a global throttling rate
can be done well with local information.

For streaming dataflow, we can have an approximate solution by knowing
the number of keys and doing per-key throttling because keys (at least
up to hundreds per worker) are all processed concurrently. This
solution doesn't even require state + timers and would best be done by
standard sleeps.

For most other systems, including dataflow batch, this would massively
under throttle. Here we need to either add something to the model, or
do something outside the model, to discover, dynamically, how many
siblings are being concurrently run. (This could be done at a
worker/process level, rather than bundle level, as well.) The ability
to broadcast, aggregate, and read dynamic, provisional from all
workers could help in other cases too (e.g. a more efficient top N),
but this is a whole new thread...

So while I think the semantics of processing timers in batch is worth
solving, this probably isn't the best application.
Yes, it seems that under the assumption of dynamic parallelism defined 
by runner defining global throttling rate is not possible under the 
current model. But maybe (rather than introducing a whole new concept) 
we could propagate the informatoin about current parallelism from runner 
to DoFn via ProcessContext? For some runners that would be as easy as 
returning a constant. Dynamic runners would be more involved, but the 
only other option than propagaring parallelism from runner to workers 
seems to be introduction of a whole new worker <-> runner communication 
channel, so that worker could ask runner for a permission to proceed 
with processing data based on some (global) condition. It feels somewhat 
too complex given the motivating example. Maybe there could be others so 
that this could be generalized to a concept, what comes to mind is 
something Flink calls "watermark alignment", which throttles sources 
based on the event-time progress of individual partitions, so that 
partitions that are too ahead of time do not blow up downstream state. 
These might be related concepts.



We'd need a discussion of what an SDK must do if the runner doesn't support the 
central clock for completeness, and consistency.


On Tue, Feb 27, 2024, 6:58 AM Jan Lukavský  wrote:

On 2/27/24 14:51, Kenneth Knowles wrote:

I very much like the idea of processing time clock as a parameter to 
@ProcessElement. That will be obviously useful and remove a source of 
inconsistency, in addition to letting the runner/SDK harness control it. I also 
like the idea of 

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-27 Thread Jan Lukavský

On 2/27/24 19:22, Robert Bradshaw via dev wrote:

On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles  wrote:

Pulling out focus points:

On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
wrote:

I can't act on something yet [...] but I expect to be able to [...] at some 
time in the processing-time future.

I like this as a clear and internally-consistent feature description. It 
describes ProcessContinuation and those timers which serve the same purpose as 
ProcessContinuation.

On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
wrote:

I can't think of a batch or streaming scenario where it would be correct to not 
wait at least that long

The main reason we created timers: to take action in the absence of data. The archetypal 
use case for processing time timers was/is "flush data from state if it has been 
sitting there too long". For this use case, the right behavior for batch is to skip 
the timer. It is actually basically incorrect to wait.

Good point calling out the distinction between "I need to wait in case
there's more data." and "I need to wait for something external." We
can't currently distinguish between the two, but a batch runner can
say something definitive about the first. Feels like we need a new
primitive (or at least new signaling information on our existing
primitive).
Runners signal end of data to a DoFn via (input) watermark. Is there a 
need for additional information?



On Fri, Feb 23, 2024 at 3:54 PM Robert Burke  wrote:

It doesn't require a new primitive.

IMO what's being proposed *is* a new primitive. I think it is a good primitive. 
It is the underlying primitive to ProcessContinuation. It would be 
user-friendly as a kind of timer. But if we made this the behavior of 
processing time timers retroactively, it would break everyone using them to 
flush data who is also reprocessing data.

There's two very different use cases ("I need to wait, and block data" vs "I want to 
act without data, aka NOT wait for data") and I think we should serve both of them, but it 
doesn't have to be with the same low-level feature.

Kenn


On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
wrote:

On Fri, Feb 23, 2024 at 3:54 PM Robert Burke  wrote:

While I'm currently on the other side of the fence, I would not be against changing/requiring the semantics of 
ProcessingTime constructs to be "must wait and execute" as such a solution, and enables the Proposed 
"batch" process continuation throttling mechanism to work as hypothesized for both "batch" and 
"streaming" execution.

There's a lot to like, as it leans Beam further into the unification of Batch 
and Stream, with one fewer exception (eg. unifies timer experience further). It 
doesn't require a new primitive. It probably matches more with user 
expectations anyway.

It does cause looping timer execution with processing time to be a problem for 
Drains however.

I think we have a problem with looping timers plus drain (a mostly
streaming idea anyway) regardless.


I'd argue though that in the case of a drain, we could updated the semantics as "move 
watermark to infinity"  "existing timers are executed, but new timers are ignored",

I don't like the idea of dropping timers for drain. I think correct
handling here requires user visibility into whether a pipeline is
draining or not.


and ensure/and update the requirements around OnWindowExpiration callbacks to be a bit 
more insistent on being implemented for correct execution, which is currently the only 
"hard" signal to the SDK side that the window's work is guaranteed to be over, 
and remaining state needs to be addressed by the transform or be garbage collected. This 
remains critical for developing a good pattern for ProcessingTime timers within a Global 
Window too.

+1


On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote:

Thanks for bringing this up.

My position is that both batch and streaming should wait for
processing time timers, according to local time (with the exception of
tests that can accelerate this via faked clocks).

Both ProcessContinuations delays and ProcessingTimeTimers are IMHO
isomorphic, and can be implemented in terms of each other (at least in
one direction, and likely the other). Both are an indication that I
can't act on something yet due to external constraints (e.g. not all
the data has been published, or I lack sufficient capacity/quota to
push things downstream) but I expect to be able to (or at least would
like to check again) at some time in the processing-time future. I
can't think of a batch or streaming scenario where it would be correct
to not wait at least that long (even in batch inputs, e.g. suppose I'm
tailing logs and was eagerly started before they were fully written,
or waiting for some kind of (non-data-dependent) quiessence or other
operation to finish).


On Fri, Feb 23, 2024 at 12:36 AM Jan Lukavský  wrote:

For me it always helps to seek analogy in our physical reality. Stream
processing actually has quite 

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-27 Thread Robert Bradshaw via dev
On Tue, Feb 27, 2024 at 7:44 AM Robert Burke  wrote:
>
> An "as fast as it can runner" with dynamic splits, would ultimately split to 
> the systems maximum available parallelism (for stateful DoFns, this is the 
> number of keys; for SplittableDoFns, this is the maximum sharding of each 
> input element's restriction. That's what would happen with a "normal" sleep.
>
> WRT Portability, this means adding a current ProcessingTime field to the 
> ProcessBundleRequest, and likely also to the ProgressRequest so the runner 
> could coordinate. ProgressResponse may then need a "asleepUntil" field to 
> communicate back the state of the bundle, which the runner could then use to 
> better time its next ProgressRequest, and potentially arrest dynamic 
> splitting for that bundle. After all, the sleeping bundle is blocked until 
> processing time has advanced anyway; no progress can be made.
>
> I like moving the abstraction out of the timer space, as it better aligns 
> with user intent for the throttle case, and it doesn't require a Stateful 
> DoFn to operate (orthogonal!), meaning it's useful for It also solves the 
> testing issue WRT ProcessingTime timers using an absolute time, rather than a 
> relative time, as the SDK can rebuild it's relative setters for output time 
> on the new canonical processing time, without user code changing.
>
> The sleeping inprogress bundle naturally holds back the watermark too.
>
> I suspect this mechanism would end up tending to over throttle as Reuven 
> described earlier, since the user is only pushing back on immediate 
> processing for the current element, not necessarily all elements. This is 
> particularly likely if there's a long gap between ProgressRequests for the 
> bundle and the runner doesn't adapt it's cadence.
>
> An external source of rate doesn't really exist, other than some external 
> source that can provide throttle information. There would remain time skew 
> between the runner system and the external system though, but for a throttle 
> that's likely fine.
>
> A central notion of ProcessingTime also allows the runner to "smear" 
> processing time so if there's a particularly long delay, it doesn't need to 
> catch up at once. I don't think that's relevant for the throttle case though, 
> since with the described clock mechanism and the communication back to the 
> runner, the unblocking notion is probably fine.

On this note, I have become skeptical that a global throttling rate
can be done well with local information.

For streaming dataflow, we can have an approximate solution by knowing
the number of keys and doing per-key throttling because keys (at least
up to hundreds per worker) are all processed concurrently. This
solution doesn't even require state + timers and would best be done by
standard sleeps.

For most other systems, including dataflow batch, this would massively
under throttle. Here we need to either add something to the model, or
do something outside the model, to discover, dynamically, how many
siblings are being concurrently run. (This could be done at a
worker/process level, rather than bundle level, as well.) The ability
to broadcast, aggregate, and read dynamic, provisional from all
workers could help in other cases too (e.g. a more efficient top N),
but this is a whole new thread...

So while I think the semantics of processing timers in batch is worth
solving, this probably isn't the best application.

> We'd need a discussion of what an SDK must do if the runner doesn't support 
> the central clock for completeness, and consistency.
>
>
> On Tue, Feb 27, 2024, 6:58 AM Jan Lukavský  wrote:
>>
>> On 2/27/24 14:51, Kenneth Knowles wrote:
>>
>> I very much like the idea of processing time clock as a parameter to 
>> @ProcessElement. That will be obviously useful and remove a source of 
>> inconsistency, in addition to letting the runner/SDK harness control it. I 
>> also like the idea of passing a Sleeper or to @ProcessElement. These are 
>> both good practices for testing and flexibility and runner/SDK language 
>> differences.
>>
>> In your (a) (b) (c) can you be more specific about which watermarks you are 
>> referring to? Are they the same as in my opening email? If so, then what you 
>> describe is what we already have.
>>
>> Yes, we have that for streaming, but it does not work this way in batch. In 
>> my understanding we violate (a), we ignore (b) because we fire timers at GC 
>> time only and (c) is currently relevant only immediately preceding window GC 
>> time, but can be defined more generally. But essentially yes, I was just 
>> trying to restate the streaming processing time semantics in the limited 
>> batch case.
>>
>>
>> Kenn
>>
>> On Tue, Feb 27, 2024 at 2:40 AM Jan Lukavský  wrote:
>>>
>>> I think that before we introduce a possibly somewhat duplicate new feature 
>>> we should be certain that it is really semantically different. I'll 
>>> rephrase the two cases:
>>>
>>>  a) need to wait and block data 

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-27 Thread Robert Bradshaw via dev
On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles  wrote:
>
> Pulling out focus points:
>
> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
> wrote:
> > I can't act on something yet [...] but I expect to be able to [...] at some 
> > time in the processing-time future.
>
> I like this as a clear and internally-consistent feature description. It 
> describes ProcessContinuation and those timers which serve the same purpose 
> as ProcessContinuation.
>
> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
> wrote:
> > I can't think of a batch or streaming scenario where it would be correct to 
> > not wait at least that long
>
> The main reason we created timers: to take action in the absence of data. The 
> archetypal use case for processing time timers was/is "flush data from state 
> if it has been sitting there too long". For this use case, the right behavior 
> for batch is to skip the timer. It is actually basically incorrect to wait.

Good point calling out the distinction between "I need to wait in case
there's more data." and "I need to wait for something external." We
can't currently distinguish between the two, but a batch runner can
say something definitive about the first. Feels like we need a new
primitive (or at least new signaling information on our existing
primitive).

> On Fri, Feb 23, 2024 at 3:54 PM Robert Burke  wrote:
> > It doesn't require a new primitive.
>
> IMO what's being proposed *is* a new primitive. I think it is a good 
> primitive. It is the underlying primitive to ProcessContinuation. It would be 
> user-friendly as a kind of timer. But if we made this the behavior of 
> processing time timers retroactively, it would break everyone using them to 
> flush data who is also reprocessing data.
>
> There's two very different use cases ("I need to wait, and block data" vs "I 
> want to act without data, aka NOT wait for data") and I think we should serve 
> both of them, but it doesn't have to be with the same low-level feature.
>
> Kenn
>
>
> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev  
> wrote:
>>
>> On Fri, Feb 23, 2024 at 3:54 PM Robert Burke  wrote:
>> >
>> > While I'm currently on the other side of the fence, I would not be against 
>> > changing/requiring the semantics of ProcessingTime constructs to be "must 
>> > wait and execute" as such a solution, and enables the Proposed "batch" 
>> > process continuation throttling mechanism to work as hypothesized for both 
>> > "batch" and "streaming" execution.
>> >
>> > There's a lot to like, as it leans Beam further into the unification of 
>> > Batch and Stream, with one fewer exception (eg. unifies timer experience 
>> > further). It doesn't require a new primitive. It probably matches more 
>> > with user expectations anyway.
>> >
>> > It does cause looping timer execution with processing time to be a problem 
>> > for Drains however.
>>
>> I think we have a problem with looping timers plus drain (a mostly
>> streaming idea anyway) regardless.
>>
>> > I'd argue though that in the case of a drain, we could updated the 
>> > semantics as "move watermark to infinity"  "existing timers are executed, 
>> > but new timers are ignored",
>>
>> I don't like the idea of dropping timers for drain. I think correct
>> handling here requires user visibility into whether a pipeline is
>> draining or not.
>>
>> > and ensure/and update the requirements around OnWindowExpiration callbacks 
>> > to be a bit more insistent on being implemented for correct execution, 
>> > which is currently the only "hard" signal to the SDK side that the 
>> > window's work is guaranteed to be over, and remaining state needs to be 
>> > addressed by the transform or be garbage collected. This remains critical 
>> > for developing a good pattern for ProcessingTime timers within a Global 
>> > Window too.
>>
>> +1
>>
>> >
>> > On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote:
>> > > Thanks for bringing this up.
>> > >
>> > > My position is that both batch and streaming should wait for
>> > > processing time timers, according to local time (with the exception of
>> > > tests that can accelerate this via faked clocks).
>> > >
>> > > Both ProcessContinuations delays and ProcessingTimeTimers are IMHO
>> > > isomorphic, and can be implemented in terms of each other (at least in
>> > > one direction, and likely the other). Both are an indication that I
>> > > can't act on something yet due to external constraints (e.g. not all
>> > > the data has been published, or I lack sufficient capacity/quota to
>> > > push things downstream) but I expect to be able to (or at least would
>> > > like to check again) at some time in the processing-time future. I
>> > > can't think of a batch or streaming scenario where it would be correct
>> > > to not wait at least that long (even in batch inputs, e.g. suppose I'm
>> > > tailing logs and was eagerly started before they were fully written,
>> > > or waiting for some kind of (non-data-dependent) 

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-27 Thread Jan Lukavský

On 2/27/24 16:36, Robert Burke wrote:
An "as fast as it can runner" with dynamic splits, would ultimately 
split to the systems maximum available parallelism (for stateful 
DoFns, this is the number of keys; for SplittableDoFns, this is the 
maximum sharding of each input element's restriction. That's what 
would happen with a "normal" sleep.
I see. It is definitely possible for a runner to split all processing to 
maximum parallelism, but - provided this cannot be controlled by user - 
can the semantics of the Throttle transform be even consistently defined 
in terms of processing time? Seems it would require a coordination with 
the runner so that user-code would at least be aware of current 
parallelism. The situation is easier for runners that set parallelism 
upfront.


WRT Portability, this means adding a current ProcessingTime field to 
the ProcessBundleRequest, and likely also to the ProgressRequest so 
the runner could coordinate. ProgressResponse may then need a 
"asleepUntil" field to communicate back the state of the bundle, which 
the runner could then use to better time its next ProgressRequest, and 
potentially arrest dynamic splitting for that bundle. After all, the 
sleeping bundle is blocked until processing time has advanced anyway; 
no progress can be made.


I like moving the abstraction out of the timer space, as it better 
aligns with user intent for the throttle case, and it doesn't require 
a Stateful DoFn to operate (orthogonal!), meaning it's useful for It 
also solves the testing issue WRT ProcessingTime timers using an 
absolute time, rather than a relative time, as the SDK can rebuild 
it's relative setters for output time on the new canonical processing 
time, without user code changing.
With what was said above - is the definition of sleep (pause) valid in 
the context of a bundle? By the same logic of splitting keys, "enough 
fast and efficient runner" could delay only the paused bundle and start 
processing different bundle (via different DoFn). It might require 
splitting bundles by keys, but should be possible. Seems that would in 
the end make the feature useless as well.


The sleeping inprogress bundle naturally holds back the watermark too.

I suspect this mechanism would end up tending to over throttle as 
Reuven described earlier, since the user is only pushing back on 
immediate processing for the current element, not necessarily all 
elements. This is particularly likely if there's a long gap between 
ProgressRequests for the bundle and the runner doesn't adapt it's cadence.


An external source of rate doesn't really exist, other than some 
external source that can provide throttle information. There would 
remain time skew between the runner system and the external system 
though, but for a throttle that's likely fine.


A central notion of ProcessingTime also allows the runner to "smear" 
processing time so if there's a particularly long delay, it doesn't 
need to catch up at once. I don't think that's relevant for the 
throttle case though, since with the described clock mechanism and the 
communication back to the runner, the unblocking notion is probably fine.


We'd need a discussion of what an SDK must do if the runner doesn't 
support the central clock for completeness, and consistency.



On Tue, Feb 27, 2024, 6:58 AM Jan Lukavský  wrote:

On 2/27/24 14:51, Kenneth Knowles wrote:

I very much like the idea of processing time clock as a parameter
to @ProcessElement. That will be obviously useful and remove a
source of inconsistency, in addition to letting the runner/SDK
harness control it. I also like the idea of passing a Sleeper or
to @ProcessElement. These are both good practices for testing and
flexibility and runner/SDK language differences.

In your (a) (b) (c) can you be more specific about which
watermarks you are referring to? Are they the same as in my
opening email? If so, then what you describe is what we already have.

Yes, we have that for streaming, but it does not work this way in
batch. In my understanding we violate (a), we ignore (b) because
we fire timers at GC time only and (c) is currently relevant only
immediately preceding window GC time, but can be defined more
generally. But essentially yes, I was just trying to restate the
streaming processing time semantics in the limited batch case.


Kenn

On Tue, Feb 27, 2024 at 2:40 AM Jan Lukavský  wrote:

I think that before we introduce a possibly somewhat
duplicate new feature we should be certain that it is really
semantically different. I'll rephrase the two cases:

 a) need to wait and block data (delay) - the use case is the
motivating example of Throttle transform

 b) act without data, not block

Provided we align processing time with local machine clock
(or better, because of testing, make current processing time
available via context 

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-27 Thread Robert Burke
An "as fast as it can runner" with dynamic splits, would ultimately split
to the systems maximum available parallelism (for stateful DoFns, this is
the number of keys; for SplittableDoFns, this is the maximum sharding of
each input element's restriction. That's what would happen with a "normal"
sleep.

WRT Portability, this means adding a current ProcessingTime field to the
ProcessBundleRequest, and likely also to the ProgressRequest so the runner
could coordinate. ProgressResponse may then need a "asleepUntil" field to
communicate back the state of the bundle, which the runner could then use
to better time its next ProgressRequest, and potentially arrest dynamic
splitting for that bundle. After all, the sleeping bundle is blocked until
processing time has advanced anyway; no progress can be made.

I like moving the abstraction out of the timer space, as it better aligns
with user intent for the throttle case, and it doesn't require a Stateful
DoFn to operate (orthogonal!), meaning it's useful for It also solves the
testing issue WRT ProcessingTime timers using an absolute time, rather than
a relative time, as the SDK can rebuild it's relative setters for output
time on the new canonical processing time, without user code changing.

The sleeping inprogress bundle naturally holds back the watermark too.

I suspect this mechanism would end up tending to over throttle as Reuven
described earlier, since the user is only pushing back on immediate
processing for the current element, not necessarily all elements. This is
particularly likely if there's a long gap between ProgressRequests for the
bundle and the runner doesn't adapt it's cadence.

An external source of rate doesn't really exist, other than some external
source that can provide throttle information. There would remain time skew
between the runner system and the external system though, but for a
throttle that's likely fine.

A central notion of ProcessingTime also allows the runner to "smear"
processing time so if there's a particularly long delay, it doesn't need to
catch up at once. I don't think that's relevant for the throttle case
though, since with the described clock mechanism and the communication back
to the runner, the unblocking notion is probably fine.

We'd need a discussion of what an SDK must do if the runner doesn't support
the central clock for completeness, and consistency.


On Tue, Feb 27, 2024, 6:58 AM Jan Lukavský  wrote:

> On 2/27/24 14:51, Kenneth Knowles wrote:
>
> I very much like the idea of processing time clock as a parameter
> to @ProcessElement. That will be obviously useful and remove a source of
> inconsistency, in addition to letting the runner/SDK harness control it. I
> also like the idea of passing a Sleeper or to @ProcessElement. These are
> both good practices for testing and flexibility and runner/SDK language
> differences.
>
> In your (a) (b) (c) can you be more specific about which watermarks you
> are referring to? Are they the same as in my opening email? If so, then
> what you describe is what we already have.
>
> Yes, we have that for streaming, but it does not work this way in batch.
> In my understanding we violate (a), we ignore (b) because we fire timers at
> GC time only and (c) is currently relevant only immediately preceding
> window GC time, but can be defined more generally. But essentially yes, I
> was just trying to restate the streaming processing time semantics in the
> limited batch case.
>
>
> Kenn
>
> On Tue, Feb 27, 2024 at 2:40 AM Jan Lukavský  wrote:
>
>> I think that before we introduce a possibly somewhat duplicate new
>> feature we should be certain that it is really semantically different. I'll
>> rephrase the two cases:
>>
>>  a) need to wait and block data (delay) - the use case is the motivating
>> example of Throttle transform
>>
>>  b) act without data, not block
>>
>> Provided we align processing time with local machine clock (or better,
>> because of testing, make current processing time available via context to
>> @ProcessElement) it seems to possble to unify both cases under slightly
>> updated semantics of processing time timer in batch:
>>
>>  a) processing time timers fire with best-effort, i.e. trying to minimize
>> delay between firing timestamp and timer's timestamp
>>  b) timer is valid only in the context of current key-window, once
>> watermark passes window GC time for the particular window that created the
>> timer, it is ignored
>>  c) if timer has output timestamp, this timestamp holds watermark (but
>> this is currently probably noop, because runners currently do no propagate
>> (per-key) watermark in batch, I assume)
>>
>> In case b) there might be needed to distinguish cases when timer has
>> output timestamp, if so, it probably should be taken into account.
>>
>> Now, such semantics should be quite aligned with what we do in streaming
>> case and what users generally expect. The blocking part can be implemented
>> in @ProcessElement using buffer & timer, 

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-27 Thread Jan Lukavský

On 2/27/24 14:51, Kenneth Knowles wrote:
I very much like the idea of processing time clock as a parameter 
to @ProcessElement. That will be obviously useful and remove a source 
of inconsistency, in addition to letting the runner/SDK harness 
control it. I also like the idea of passing a Sleeper or 
to @ProcessElement. These are both good practices for testing and 
flexibility and runner/SDK language differences.


In your (a) (b) (c) can you be more specific about which watermarks 
you are referring to? Are they the same as in my opening email? If so, 
then what you describe is what we already have.
Yes, we have that for streaming, but it does not work this way in batch. 
In my understanding we violate (a), we ignore (b) because we fire timers 
at GC time only and (c) is currently relevant only immediately preceding 
window GC time, but can be defined more generally. But essentially yes, 
I was just trying to restate the streaming processing time semantics in 
the limited batch case.


Kenn

On Tue, Feb 27, 2024 at 2:40 AM Jan Lukavský  wrote:

I think that before we introduce a possibly somewhat duplicate new
feature we should be certain that it is really semantically
different. I'll rephrase the two cases:

 a) need to wait and block data (delay) - the use case is the
motivating example of Throttle transform

 b) act without data, not block

Provided we align processing time with local machine clock (or
better, because of testing, make current processing time available
via context to @ProcessElement) it seems to possble to unify both
cases under slightly updated semantics of processing time timer in
batch:

 a) processing time timers fire with best-effort, i.e. trying to
minimize delay between firing timestamp and timer's timestamp
 b) timer is valid only in the context of current key-window, once
watermark passes window GC time for the particular window that
created the timer, it is ignored
 c) if timer has output timestamp, this timestamp holds watermark
(but this is currently probably noop, because runners currently do
no propagate (per-key) watermark in batch, I assume)

In case b) there might be needed to distinguish cases when timer
has output timestamp, if so, it probably should be taken into account.

Now, such semantics should be quite aligned with what we do in
streaming case and what users generally expect. The blocking part
can be implemented in @ProcessElement using buffer & timer, once
there is need to wait, it can be implemented in user code using
plain sleep(). That is due to the alignment between local time and
definition of processing time. If we had some reason to be able to
run faster-than-wall-clock (as I'm still not in favor of that), we
could do that using ProcessContext.sleep(). Delaying processing in
the @ProcessElement should result in backpressuring and
backpropagation of this backpressure from the Throttle transform
to the sources as mentioned (of course this is only for the
streaming case).

Is there anything missing in such definition that would still
require splitting the timers into two distinct features?

 Jan

On 2/26/24 21:22, Kenneth Knowles wrote:

Yea I like DelayTimer, or SleepTimer, or WaitTimer or some such.

OutputTime is always an event time timestamp so it isn't even
allowed to be set outside the window (or you'd end up with an
element assigned to a window that it isn't within, since
OutputTime essentially represents reserving the right to output
an element with that timestamp)

Kenn

On Mon, Feb 26, 2024 at 3:19 PM Robert Burke 
wrote:

Agreed that a retroactive behavior change would be bad, even
if tied to a beam version change. I agree that it meshes well
with the general theme of State & Timers exposing underlying
primitives for implementing Windowing and similar. I'd say
the distinction between the two might be additional
complexity for users to grok, and would need to be documented
well, as both operate in the ProcessingTime domain, but
differently.

What to call this new timer then? DelayTimer?

"A DelayTimer sets an instant in ProcessingTime at which
point computations can continue. Runners will prevent the
EventTimer watermark from advancing past the set OutputTime
until Processing Time has advanced to at least the provided
instant to execute the timers callback. This can be used to
allow the runner to constrain pipeline throughput with user
guidance."

I'd probably add that a timer with an output time outside of
the window would not be guaranteed to fire, and that
OnWindowExpiry is the correct way to ensure cleanup occurs.

No solution to the Looping Timers on Drain problem here, but
i think that's 

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-27 Thread Kenneth Knowles
I very much like the idea of processing time clock as a parameter
to @ProcessElement. That will be obviously useful and remove a source of
inconsistency, in addition to letting the runner/SDK harness control it. I
also like the idea of passing a Sleeper or to @ProcessElement. These are
both good practices for testing and flexibility and runner/SDK language
differences.

In your (a) (b) (c) can you be more specific about which watermarks you are
referring to? Are they the same as in my opening email? If so, then what
you describe is what we already have.

Kenn

On Tue, Feb 27, 2024 at 2:40 AM Jan Lukavský  wrote:

> I think that before we introduce a possibly somewhat duplicate new feature
> we should be certain that it is really semantically different. I'll
> rephrase the two cases:
>
>  a) need to wait and block data (delay) - the use case is the motivating
> example of Throttle transform
>
>  b) act without data, not block
>
> Provided we align processing time with local machine clock (or better,
> because of testing, make current processing time available via context to
> @ProcessElement) it seems to possble to unify both cases under slightly
> updated semantics of processing time timer in batch:
>
>  a) processing time timers fire with best-effort, i.e. trying to minimize
> delay between firing timestamp and timer's timestamp
>  b) timer is valid only in the context of current key-window, once
> watermark passes window GC time for the particular window that created the
> timer, it is ignored
>  c) if timer has output timestamp, this timestamp holds watermark (but
> this is currently probably noop, because runners currently do no propagate
> (per-key) watermark in batch, I assume)
>
> In case b) there might be needed to distinguish cases when timer has
> output timestamp, if so, it probably should be taken into account.
>
> Now, such semantics should be quite aligned with what we do in streaming
> case and what users generally expect. The blocking part can be implemented
> in @ProcessElement using buffer & timer, once there is need to wait, it can
> be implemented in user code using plain sleep(). That is due to the
> alignment between local time and definition of processing time. If we had
> some reason to be able to run faster-than-wall-clock (as I'm still not in
> favor of that), we could do that using ProcessContext.sleep(). Delaying
> processing in the @ProcessElement should result in backpressuring and
> backpropagation of this backpressure from the Throttle transform to the
> sources as mentioned (of course this is only for the streaming case).
>
> Is there anything missing in such definition that would still require
> splitting the timers into two distinct features?
>
>  Jan
> On 2/26/24 21:22, Kenneth Knowles wrote:
>
> Yea I like DelayTimer, or SleepTimer, or WaitTimer or some such.
>
> OutputTime is always an event time timestamp so it isn't even allowed to
> be set outside the window (or you'd end up with an element assigned to a
> window that it isn't within, since OutputTime essentially represents
> reserving the right to output an element with that timestamp)
>
> Kenn
>
> On Mon, Feb 26, 2024 at 3:19 PM Robert Burke  wrote:
>
>> Agreed that a retroactive behavior change would be bad, even if tied to a
>> beam version change. I agree that it meshes well with the general theme of
>> State & Timers exposing underlying primitives for implementing Windowing
>> and similar. I'd say the distinction between the two might be additional
>> complexity for users to grok, and would need to be documented well, as both
>> operate in the ProcessingTime domain, but differently.
>>
>> What to call this new timer then? DelayTimer?
>>
>> "A DelayTimer sets an instant in ProcessingTime at which point
>> computations can continue. Runners will prevent the EventTimer watermark
>> from advancing past the set OutputTime until Processing Time has advanced
>> to at least the provided instant to execute the timers callback. This can
>> be used to allow the runner to constrain pipeline throughput with user
>> guidance."
>>
>> I'd probably add that a timer with an output time outside of the window
>> would not be guaranteed to fire, and that OnWindowExpiry is the correct way
>> to ensure cleanup occurs.
>>
>> No solution to the Looping Timers on Drain problem here, but i think
>> that's ultimately an orthogonal discussion, and will restrain my thoughts
>> on that for now.
>>
>> This isn't a proposal, but exploring the solution space within our
>> problem. We'd want to break down exactly what different and the same for
>> the 3 kinds of timers...
>>
>>
>>
>>
>> On Mon, Feb 26, 2024, 11:45 AM Kenneth Knowles  wrote:
>>
>>> Pulling out focus points:
>>>
>>> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <
>>> dev@beam.apache.org> wrote:
>>> > I can't act on something yet [...] but I expect to be able to [...] at
>>> some time in the processing-time future.
>>>
>>> I like this as a clear and 

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-26 Thread Jan Lukavský
I think that before we introduce a possibly somewhat duplicate new 
feature we should be certain that it is really semantically different. 
I'll rephrase the two cases:


 a) need to wait and block data (delay) - the use case is the 
motivating example of Throttle transform


 b) act without data, not block

Provided we align processing time with local machine clock (or better, 
because of testing, make current processing time available via context 
to @ProcessElement) it seems to possble to unify both cases under 
slightly updated semantics of processing time timer in batch:


 a) processing time timers fire with best-effort, i.e. trying to 
minimize delay between firing timestamp and timer's timestamp
 b) timer is valid only in the context of current key-window, once 
watermark passes window GC time for the particular window that created 
the timer, it is ignored
 c) if timer has output timestamp, this timestamp holds watermark (but 
this is currently probably noop, because runners currently do no 
propagate (per-key) watermark in batch, I assume)


In case b) there might be needed to distinguish cases when timer has 
output timestamp, if so, it probably should be taken into account.


Now, such semantics should be quite aligned with what we do in streaming 
case and what users generally expect. The blocking part can be 
implemented in @ProcessElement using buffer & timer, once there is need 
to wait, it can be implemented in user code using plain sleep(). That is 
due to the alignment between local time and definition of processing 
time. If we had some reason to be able to run faster-than-wall-clock (as 
I'm still not in favor of that), we could do that using 
ProcessContext.sleep(). Delaying processing in the @ProcessElement 
should result in backpressuring and backpropagation of this backpressure 
from the Throttle transform to the sources as mentioned (of course this 
is only for the streaming case).


Is there anything missing in such definition that would still require 
splitting the timers into two distinct features?


 Jan

On 2/26/24 21:22, Kenneth Knowles wrote:

Yea I like DelayTimer, or SleepTimer, or WaitTimer or some such.

OutputTime is always an event time timestamp so it isn't even allowed 
to be set outside the window (or you'd end up with an element assigned 
to a window that it isn't within, since OutputTime essentially 
represents reserving the right to output an element with that timestamp)


Kenn

On Mon, Feb 26, 2024 at 3:19 PM Robert Burke  wrote:

Agreed that a retroactive behavior change would be bad, even if
tied to a beam version change. I agree that it meshes well with
the general theme of State & Timers exposing underlying primitives
for implementing Windowing and similar. I'd say the distinction
between the two might be additional complexity for users to grok,
and would need to be documented well, as both operate in the
ProcessingTime domain, but differently.

What to call this new timer then? DelayTimer?

"A DelayTimer sets an instant in ProcessingTime at which point
computations can continue. Runners will prevent the EventTimer
watermark from advancing past the set OutputTime until Processing
Time has advanced to at least the provided instant to execute the
timers callback. This can be used to allow the runner to constrain
pipeline throughput with user guidance."

I'd probably add that a timer with an output time outside of the
window would not be guaranteed to fire, and that OnWindowExpiry is
the correct way to ensure cleanup occurs.

No solution to the Looping Timers on Drain problem here, but i
think that's ultimately an orthogonal discussion, and will
restrain my thoughts on that for now.

This isn't a proposal, but exploring the solution space within our
problem. We'd want to break down exactly what different and the
same for the 3 kinds of timers...




On Mon, Feb 26, 2024, 11:45 AM Kenneth Knowles 
wrote:

Pulling out focus points:

On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev
 wrote:
> I can't act on something yet [...] but I expect to be able
to [...] at some time in the processing-time future.

I like this as a clear and internally-consistent feature
description. It describes ProcessContinuation and those timers
which serve the same purpose as ProcessContinuation.

On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev
 wrote:
> I can't think of a batch or streaming scenario where it
would be correct to not wait at least that long

The main reason we created timers: to take action in the
absence of data. The archetypal use case for processing time
timers was/is "flush data from state if it has been sitting
there too long". For this use case, the right behavior for
batch is to skip the timer. It is actually 

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-26 Thread Kenneth Knowles
Yea I like DelayTimer, or SleepTimer, or WaitTimer or some such.

OutputTime is always an event time timestamp so it isn't even allowed to be
set outside the window (or you'd end up with an element assigned to a
window that it isn't within, since OutputTime essentially represents
reserving the right to output an element with that timestamp)

Kenn

On Mon, Feb 26, 2024 at 3:19 PM Robert Burke  wrote:

> Agreed that a retroactive behavior change would be bad, even if tied to a
> beam version change. I agree that it meshes well with the general theme of
> State & Timers exposing underlying primitives for implementing Windowing
> and similar. I'd say the distinction between the two might be additional
> complexity for users to grok, and would need to be documented well, as both
> operate in the ProcessingTime domain, but differently.
>
> What to call this new timer then? DelayTimer?
>
> "A DelayTimer sets an instant in ProcessingTime at which point
> computations can continue. Runners will prevent the EventTimer watermark
> from advancing past the set OutputTime until Processing Time has advanced
> to at least the provided instant to execute the timers callback. This can
> be used to allow the runner to constrain pipeline throughput with user
> guidance."
>
> I'd probably add that a timer with an output time outside of the window
> would not be guaranteed to fire, and that OnWindowExpiry is the correct way
> to ensure cleanup occurs.
>
> No solution to the Looping Timers on Drain problem here, but i think
> that's ultimately an orthogonal discussion, and will restrain my thoughts
> on that for now.
>
> This isn't a proposal, but exploring the solution space within our
> problem. We'd want to break down exactly what different and the same for
> the 3 kinds of timers...
>
>
>
>
> On Mon, Feb 26, 2024, 11:45 AM Kenneth Knowles  wrote:
>
>> Pulling out focus points:
>>
>> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <
>> dev@beam.apache.org> wrote:
>> > I can't act on something yet [...] but I expect to be able to [...] at
>> some time in the processing-time future.
>>
>> I like this as a clear and internally-consistent feature description. It
>> describes ProcessContinuation and those timers which serve the same purpose
>> as ProcessContinuation.
>>
>> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <
>> dev@beam.apache.org> wrote:
>> > I can't think of a batch or streaming scenario where it would be
>> correct to not wait at least that long
>>
>> The main reason we created timers: to take action in the absence of data.
>> The archetypal use case for processing time timers was/is "flush data from
>> state if it has been sitting there too long". For this use case, the right
>> behavior for batch is to skip the timer. It is actually basically incorrect
>> to wait.
>>
>> On Fri, Feb 23, 2024 at 3:54 PM Robert Burke  wrote:
>> > It doesn't require a new primitive.
>>
>> IMO what's being proposed *is* a new primitive. I think it is a good
>> primitive. It is the underlying primitive to ProcessContinuation. It
>> would be user-friendly as a kind of timer. But if we made this the behavior
>> of processing time timers retroactively, it would break everyone using them
>> to flush data who is also reprocessing data.
>>
>> There's two very different use cases ("I need to wait, and block data" vs
>> "I want to act without data, aka NOT wait for data") and I think we should
>> serve both of them, but it doesn't have to be with the same low-level
>> feature.
>>
>> Kenn
>>
>>
>> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <
>> dev@beam.apache.org> wrote:
>>
>>> On Fri, Feb 23, 2024 at 3:54 PM Robert Burke 
>>> wrote:
>>> >
>>> > While I'm currently on the other side of the fence, I would not be
>>> against changing/requiring the semantics of ProcessingTime constructs to be
>>> "must wait and execute" as such a solution, and enables the Proposed
>>> "batch" process continuation throttling mechanism to work as hypothesized
>>> for both "batch" and "streaming" execution.
>>> >
>>> > There's a lot to like, as it leans Beam further into the unification
>>> of Batch and Stream, with one fewer exception (eg. unifies timer experience
>>> further). It doesn't require a new primitive. It probably matches more with
>>> user expectations anyway.
>>> >
>>> > It does cause looping timer execution with processing time to be a
>>> problem for Drains however.
>>>
>>> I think we have a problem with looping timers plus drain (a mostly
>>> streaming idea anyway) regardless.
>>>
>>> > I'd argue though that in the case of a drain, we could updated the
>>> semantics as "move watermark to infinity"  "existing timers are executed,
>>> but new timers are ignored",
>>>
>>> I don't like the idea of dropping timers for drain. I think correct
>>> handling here requires user visibility into whether a pipeline is
>>> draining or not.
>>>
>>> > and ensure/and update the requirements around OnWindowExpiration
>>> callbacks to be 

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-26 Thread Robert Burke
Agreed that a retroactive behavior change would be bad, even if tied to a
beam version change. I agree that it meshes well with the general theme of
State & Timers exposing underlying primitives for implementing Windowing
and similar. I'd say the distinction between the two might be additional
complexity for users to grok, and would need to be documented well, as both
operate in the ProcessingTime domain, but differently.

What to call this new timer then? DelayTimer?

"A DelayTimer sets an instant in ProcessingTime at which point computations
can continue. Runners will prevent the EventTimer watermark from advancing
past the set OutputTime until Processing Time has advanced to at least the
provided instant to execute the timers callback. This can be used to allow
the runner to constrain pipeline throughput with user guidance."

I'd probably add that a timer with an output time outside of the window
would not be guaranteed to fire, and that OnWindowExpiry is the correct way
to ensure cleanup occurs.

No solution to the Looping Timers on Drain problem here, but i think that's
ultimately an orthogonal discussion, and will restrain my thoughts on that
for now.

This isn't a proposal, but exploring the solution space within our problem.
We'd want to break down exactly what different and the same for the 3 kinds
of timers...




On Mon, Feb 26, 2024, 11:45 AM Kenneth Knowles  wrote:

> Pulling out focus points:
>
> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
> > I can't act on something yet [...] but I expect to be able to [...] at
> some time in the processing-time future.
>
> I like this as a clear and internally-consistent feature description. It
> describes ProcessContinuation and those timers which serve the same purpose
> as ProcessContinuation.
>
> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
> > I can't think of a batch or streaming scenario where it would be correct
> to not wait at least that long
>
> The main reason we created timers: to take action in the absence of data.
> The archetypal use case for processing time timers was/is "flush data from
> state if it has been sitting there too long". For this use case, the right
> behavior for batch is to skip the timer. It is actually basically incorrect
> to wait.
>
> On Fri, Feb 23, 2024 at 3:54 PM Robert Burke  wrote:
> > It doesn't require a new primitive.
>
> IMO what's being proposed *is* a new primitive. I think it is a good
> primitive. It is the underlying primitive to ProcessContinuation. It
> would be user-friendly as a kind of timer. But if we made this the behavior
> of processing time timers retroactively, it would break everyone using them
> to flush data who is also reprocessing data.
>
> There's two very different use cases ("I need to wait, and block data" vs
> "I want to act without data, aka NOT wait for data") and I think we should
> serve both of them, but it doesn't have to be with the same low-level
> feature.
>
> Kenn
>
>
> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
>
>> On Fri, Feb 23, 2024 at 3:54 PM Robert Burke  wrote:
>> >
>> > While I'm currently on the other side of the fence, I would not be
>> against changing/requiring the semantics of ProcessingTime constructs to be
>> "must wait and execute" as such a solution, and enables the Proposed
>> "batch" process continuation throttling mechanism to work as hypothesized
>> for both "batch" and "streaming" execution.
>> >
>> > There's a lot to like, as it leans Beam further into the unification of
>> Batch and Stream, with one fewer exception (eg. unifies timer experience
>> further). It doesn't require a new primitive. It probably matches more with
>> user expectations anyway.
>> >
>> > It does cause looping timer execution with processing time to be a
>> problem for Drains however.
>>
>> I think we have a problem with looping timers plus drain (a mostly
>> streaming idea anyway) regardless.
>>
>> > I'd argue though that in the case of a drain, we could updated the
>> semantics as "move watermark to infinity"  "existing timers are executed,
>> but new timers are ignored",
>>
>> I don't like the idea of dropping timers for drain. I think correct
>> handling here requires user visibility into whether a pipeline is
>> draining or not.
>>
>> > and ensure/and update the requirements around OnWindowExpiration
>> callbacks to be a bit more insistent on being implemented for correct
>> execution, which is currently the only "hard" signal to the SDK side that
>> the window's work is guaranteed to be over, and remaining state needs to be
>> addressed by the transform or be garbage collected. This remains critical
>> for developing a good pattern for ProcessingTime timers within a Global
>> Window too.
>>
>> +1
>>
>> >
>> > On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote:
>> > > Thanks for bringing this up.
>> > >
>> > > My position is that both batch 

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-26 Thread Kenneth Knowles
Pulling out focus points:

On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev 
wrote:
> I can't act on something yet [...] but I expect to be able to [...] at
some time in the processing-time future.

I like this as a clear and internally-consistent feature description. It
describes ProcessContinuation and those timers which serve the same purpose
as ProcessContinuation.

On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev 
wrote:
> I can't think of a batch or streaming scenario where it would be correct
to not wait at least that long

The main reason we created timers: to take action in the absence of data.
The archetypal use case for processing time timers was/is "flush data from
state if it has been sitting there too long". For this use case, the right
behavior for batch is to skip the timer. It is actually basically incorrect
to wait.

On Fri, Feb 23, 2024 at 3:54 PM Robert Burke  wrote:
> It doesn't require a new primitive.

IMO what's being proposed *is* a new primitive. I think it is a good
primitive. It is the underlying primitive to ProcessContinuation. It would
be user-friendly as a kind of timer. But if we made this the behavior of
processing time timers retroactively, it would break everyone using them to
flush data who is also reprocessing data.

There's two very different use cases ("I need to wait, and block data" vs
"I want to act without data, aka NOT wait for data") and I think we should
serve both of them, but it doesn't have to be with the same low-level
feature.

Kenn


On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev 
wrote:

> On Fri, Feb 23, 2024 at 3:54 PM Robert Burke  wrote:
> >
> > While I'm currently on the other side of the fence, I would not be
> against changing/requiring the semantics of ProcessingTime constructs to be
> "must wait and execute" as such a solution, and enables the Proposed
> "batch" process continuation throttling mechanism to work as hypothesized
> for both "batch" and "streaming" execution.
> >
> > There's a lot to like, as it leans Beam further into the unification of
> Batch and Stream, with one fewer exception (eg. unifies timer experience
> further). It doesn't require a new primitive. It probably matches more with
> user expectations anyway.
> >
> > It does cause looping timer execution with processing time to be a
> problem for Drains however.
>
> I think we have a problem with looping timers plus drain (a mostly
> streaming idea anyway) regardless.
>
> > I'd argue though that in the case of a drain, we could updated the
> semantics as "move watermark to infinity"  "existing timers are executed,
> but new timers are ignored",
>
> I don't like the idea of dropping timers for drain. I think correct
> handling here requires user visibility into whether a pipeline is
> draining or not.
>
> > and ensure/and update the requirements around OnWindowExpiration
> callbacks to be a bit more insistent on being implemented for correct
> execution, which is currently the only "hard" signal to the SDK side that
> the window's work is guaranteed to be over, and remaining state needs to be
> addressed by the transform or be garbage collected. This remains critical
> for developing a good pattern for ProcessingTime timers within a Global
> Window too.
>
> +1
>
> >
> > On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote:
> > > Thanks for bringing this up.
> > >
> > > My position is that both batch and streaming should wait for
> > > processing time timers, according to local time (with the exception of
> > > tests that can accelerate this via faked clocks).
> > >
> > > Both ProcessContinuations delays and ProcessingTimeTimers are IMHO
> > > isomorphic, and can be implemented in terms of each other (at least in
> > > one direction, and likely the other). Both are an indication that I
> > > can't act on something yet due to external constraints (e.g. not all
> > > the data has been published, or I lack sufficient capacity/quota to
> > > push things downstream) but I expect to be able to (or at least would
> > > like to check again) at some time in the processing-time future. I
> > > can't think of a batch or streaming scenario where it would be correct
> > > to not wait at least that long (even in batch inputs, e.g. suppose I'm
> > > tailing logs and was eagerly started before they were fully written,
> > > or waiting for some kind of (non-data-dependent) quiessence or other
> > > operation to finish).
> > >
> > >
> > > On Fri, Feb 23, 2024 at 12:36 AM Jan Lukavský  wrote:
> > > >
> > > > For me it always helps to seek analogy in our physical reality.
> Stream
> > > > processing actually has quite a good analogy for both event-time and
> > > > processing-time - the simplest model for this being relativity
> theory.
> > > > Event-time is the time at which events occur _at distant locations_.
> Due
> > > > to finite and invariant speed of light (which is actually really
> > > > involved in the explanation why any stream processing is inevitably
> > > > 

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-23 Thread Robert Bradshaw via dev
On Fri, Feb 23, 2024 at 3:54 PM Robert Burke  wrote:
>
> While I'm currently on the other side of the fence, I would not be against 
> changing/requiring the semantics of ProcessingTime constructs to be "must 
> wait and execute" as such a solution, and enables the Proposed "batch" 
> process continuation throttling mechanism to work as hypothesized for both 
> "batch" and "streaming" execution.
>
> There's a lot to like, as it leans Beam further into the unification of Batch 
> and Stream, with one fewer exception (eg. unifies timer experience further). 
> It doesn't require a new primitive. It probably matches more with user 
> expectations anyway.
>
> It does cause looping timer execution with processing time to be a problem 
> for Drains however.

I think we have a problem with looping timers plus drain (a mostly
streaming idea anyway) regardless.

> I'd argue though that in the case of a drain, we could updated the semantics 
> as "move watermark to infinity"  "existing timers are executed, but new 
> timers are ignored",

I don't like the idea of dropping timers for drain. I think correct
handling here requires user visibility into whether a pipeline is
draining or not.

> and ensure/and update the requirements around OnWindowExpiration callbacks to 
> be a bit more insistent on being implemented for correct execution, which is 
> currently the only "hard" signal to the SDK side that the window's work is 
> guaranteed to be over, and remaining state needs to be addressed by the 
> transform or be garbage collected. This remains critical for developing a 
> good pattern for ProcessingTime timers within a Global Window too.

+1

>
> On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote:
> > Thanks for bringing this up.
> >
> > My position is that both batch and streaming should wait for
> > processing time timers, according to local time (with the exception of
> > tests that can accelerate this via faked clocks).
> >
> > Both ProcessContinuations delays and ProcessingTimeTimers are IMHO
> > isomorphic, and can be implemented in terms of each other (at least in
> > one direction, and likely the other). Both are an indication that I
> > can't act on something yet due to external constraints (e.g. not all
> > the data has been published, or I lack sufficient capacity/quota to
> > push things downstream) but I expect to be able to (or at least would
> > like to check again) at some time in the processing-time future. I
> > can't think of a batch or streaming scenario where it would be correct
> > to not wait at least that long (even in batch inputs, e.g. suppose I'm
> > tailing logs and was eagerly started before they were fully written,
> > or waiting for some kind of (non-data-dependent) quiessence or other
> > operation to finish).
> >
> >
> > On Fri, Feb 23, 2024 at 12:36 AM Jan Lukavský  wrote:
> > >
> > > For me it always helps to seek analogy in our physical reality. Stream
> > > processing actually has quite a good analogy for both event-time and
> > > processing-time - the simplest model for this being relativity theory.
> > > Event-time is the time at which events occur _at distant locations_. Due
> > > to finite and invariant speed of light (which is actually really
> > > involved in the explanation why any stream processing is inevitably
> > > unordered) these events are observed (processed) at different times
> > > (processing time, different for different observers). It is perfectly
> > > possible for an observer to observe events at a rate that is higher than
> > > one second per second. This also happens in reality for observers that
> > > travel at relativistic speeds (which might be an analogy for fast -
> > > batch - (re)processing). Besides the invariant speed, there is also
> > > another invariant - local clock (wall time) always ticks exactly at the
> > > rate of one second per second, no matter what. It is not possible to
> > > "move faster or slower" through (local) time.
> > >
> > > In my understanding the reason why we do not put any guarantees or
> > > bounds on the delay of firing processing time timers is purely technical
> > > - the processing is (per key) single-threaded, thus any timer has to
> > > wait before any element processing finishes. This is only consequence of
> > > a technical solution, not something fundamental.
> > >
> > > Having said that, my point is that according to the above analogy, it
> > > should be perfectly fine to fire processing time timers in batch based
> > > on (local wall) time only. There should be no way of manipulating this
> > > local time (excluding tests). Watermarks should be affected the same way
> > > as any buffering in a state that would happen in a stateful DoFn (i.e.
> > > set timer holds output watermark). We should probably pay attention to
> > > looping timers, but it seems possible to define a valid stopping
> > > condition (input watermark at infinity).
> > >
> > >   Jan
> > >
> > > On 2/22/24 19:50, Kenneth Knowles wrote:
> > > 

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-23 Thread Robert Burke
While I'm currently on the other side of the fence, I would not be against 
changing/requiring the semantics of ProcessingTime constructs to be "must wait 
and execute" as such a solution, and enables the Proposed "batch" process 
continuation throttling mechanism to work as hypothesized for both "batch" and 
"streaming" execution. 

There's a lot to like, as it leans Beam further into the unification of Batch 
and Stream, with one fewer exception (eg. unifies timer experience further). It 
doesn't require a new primitive. It probably matches more with user 
expectations anyway.

It does cause looping timer execution with processing time to be a problem for 
Drains however.

I'd argue though that in the case of a drain, we could updated the semantics as 
"move watermark to infinity"  "existing timers are executed, but new timers are 
ignored", and ensure/and update the requirements around OnWindowExpiration 
callbacks to be a bit more insistent on being implemented for correct 
execution, which is currently the only "hard" signal to the SDK side that the 
window's work is guaranteed to be over, and remaining state needs to be 
addressed by the transform or be garbage collected. This remains critical for 
developing a good pattern for ProcessingTime timers within a Global Window too.

On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote:
> Thanks for bringing this up.
> 
> My position is that both batch and streaming should wait for
> processing time timers, according to local time (with the exception of
> tests that can accelerate this via faked clocks).
> 
> Both ProcessContinuations delays and ProcessingTimeTimers are IMHO
> isomorphic, and can be implemented in terms of each other (at least in
> one direction, and likely the other). Both are an indication that I
> can't act on something yet due to external constraints (e.g. not all
> the data has been published, or I lack sufficient capacity/quota to
> push things downstream) but I expect to be able to (or at least would
> like to check again) at some time in the processing-time future. I
> can't think of a batch or streaming scenario where it would be correct
> to not wait at least that long (even in batch inputs, e.g. suppose I'm
> tailing logs and was eagerly started before they were fully written,
> or waiting for some kind of (non-data-dependent) quiessence or other
> operation to finish).
> 
> 
> On Fri, Feb 23, 2024 at 12:36 AM Jan Lukavský  wrote:
> >
> > For me it always helps to seek analogy in our physical reality. Stream
> > processing actually has quite a good analogy for both event-time and
> > processing-time - the simplest model for this being relativity theory.
> > Event-time is the time at which events occur _at distant locations_. Due
> > to finite and invariant speed of light (which is actually really
> > involved in the explanation why any stream processing is inevitably
> > unordered) these events are observed (processed) at different times
> > (processing time, different for different observers). It is perfectly
> > possible for an observer to observe events at a rate that is higher than
> > one second per second. This also happens in reality for observers that
> > travel at relativistic speeds (which might be an analogy for fast -
> > batch - (re)processing). Besides the invariant speed, there is also
> > another invariant - local clock (wall time) always ticks exactly at the
> > rate of one second per second, no matter what. It is not possible to
> > "move faster or slower" through (local) time.
> >
> > In my understanding the reason why we do not put any guarantees or
> > bounds on the delay of firing processing time timers is purely technical
> > - the processing is (per key) single-threaded, thus any timer has to
> > wait before any element processing finishes. This is only consequence of
> > a technical solution, not something fundamental.
> >
> > Having said that, my point is that according to the above analogy, it
> > should be perfectly fine to fire processing time timers in batch based
> > on (local wall) time only. There should be no way of manipulating this
> > local time (excluding tests). Watermarks should be affected the same way
> > as any buffering in a state that would happen in a stateful DoFn (i.e.
> > set timer holds output watermark). We should probably pay attention to
> > looping timers, but it seems possible to define a valid stopping
> > condition (input watermark at infinity).
> >
> >   Jan
> >
> > On 2/22/24 19:50, Kenneth Knowles wrote:
> > > Forking this thread.
> > >
> > > The state of processing time timers in this mode of processing is not
> > > satisfactory and is discussed a lot but we should make everything
> > > explicit.
> > >
> > > Currently, a state and timer DoFn has a number of logical watermarks:
> > > (apologies for fixed width not coming through in email lists). Treat
> > > timers as a back edge.
> > >
> > > input --(A)(C)--> ParDo(DoFn) (D)---> output
> > > ^   

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-23 Thread Robert Bradshaw via dev
Thanks for bringing this up.

My position is that both batch and streaming should wait for
processing time timers, according to local time (with the exception of
tests that can accelerate this via faked clocks).

Both ProcessContinuations delays and ProcessingTimeTimers are IMHO
isomorphic, and can be implemented in terms of each other (at least in
one direction, and likely the other). Both are an indication that I
can't act on something yet due to external constraints (e.g. not all
the data has been published, or I lack sufficient capacity/quota to
push things downstream) but I expect to be able to (or at least would
like to check again) at some time in the processing-time future. I
can't think of a batch or streaming scenario where it would be correct
to not wait at least that long (even in batch inputs, e.g. suppose I'm
tailing logs and was eagerly started before they were fully written,
or waiting for some kind of (non-data-dependent) quiessence or other
operation to finish).


On Fri, Feb 23, 2024 at 12:36 AM Jan Lukavský  wrote:
>
> For me it always helps to seek analogy in our physical reality. Stream
> processing actually has quite a good analogy for both event-time and
> processing-time - the simplest model for this being relativity theory.
> Event-time is the time at which events occur _at distant locations_. Due
> to finite and invariant speed of light (which is actually really
> involved in the explanation why any stream processing is inevitably
> unordered) these events are observed (processed) at different times
> (processing time, different for different observers). It is perfectly
> possible for an observer to observe events at a rate that is higher than
> one second per second. This also happens in reality for observers that
> travel at relativistic speeds (which might be an analogy for fast -
> batch - (re)processing). Besides the invariant speed, there is also
> another invariant - local clock (wall time) always ticks exactly at the
> rate of one second per second, no matter what. It is not possible to
> "move faster or slower" through (local) time.
>
> In my understanding the reason why we do not put any guarantees or
> bounds on the delay of firing processing time timers is purely technical
> - the processing is (per key) single-threaded, thus any timer has to
> wait before any element processing finishes. This is only consequence of
> a technical solution, not something fundamental.
>
> Having said that, my point is that according to the above analogy, it
> should be perfectly fine to fire processing time timers in batch based
> on (local wall) time only. There should be no way of manipulating this
> local time (excluding tests). Watermarks should be affected the same way
> as any buffering in a state that would happen in a stateful DoFn (i.e.
> set timer holds output watermark). We should probably pay attention to
> looping timers, but it seems possible to define a valid stopping
> condition (input watermark at infinity).
>
>   Jan
>
> On 2/22/24 19:50, Kenneth Knowles wrote:
> > Forking this thread.
> >
> > The state of processing time timers in this mode of processing is not
> > satisfactory and is discussed a lot but we should make everything
> > explicit.
> >
> > Currently, a state and timer DoFn has a number of logical watermarks:
> > (apologies for fixed width not coming through in email lists). Treat
> > timers as a back edge.
> >
> > input --(A)(C)--> ParDo(DoFn) (D)---> output
> > ^  |
> > |--(B)-|
> >timers
> >
> > (A) Input Element watermark: this is the watermark that promises there
> > is no incoming element with a timestamp earlier than it. Each input
> > element's timestamp holds this watermark. Note that *event time timers
> > firing is according to this watermark*. But a runner commits changes
> > to this watermark *whenever it wants*, in a way that can be
> > consistent. So the runner can absolute process *all* the elements
> > before advancing the watermark (A), and only afterwards start firing
> > timers.
> >
> > (B) Timer watermark: this is a watermark that promises no timer is set
> > with an output timestamp earlier than it. Each timer that has an
> > output timestamp holds this watermark. Note that timers can set new
> > timers, indefinitely, so this may never reach infinity even in a drain
> > scenario.
> >
> > (C) (derived) total input watermark: this is a watermark that is the
> > minimum of the two above, and ensures that all state for the DoFn for
> > expired windows can be GCd after calling @OnWindowExpiration.
> >
> > (D) output watermark: this is a promise that the DoFn will not output
> > earlier than the watermark. It is held by the total input watermark.
> >
> > So a any timer, processing or not, holds the total input watermark
> > which prevents window GC, hence the timer must be fired. You can set
> > timers without a timestamp and they will not hold (B) hence not 

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-23 Thread Jan Lukavský
For me it always helps to seek analogy in our physical reality. Stream 
processing actually has quite a good analogy for both event-time and 
processing-time - the simplest model for this being relativity theory. 
Event-time is the time at which events occur _at distant locations_. Due 
to finite and invariant speed of light (which is actually really 
involved in the explanation why any stream processing is inevitably 
unordered) these events are observed (processed) at different times 
(processing time, different for different observers). It is perfectly 
possible for an observer to observe events at a rate that is higher than 
one second per second. This also happens in reality for observers that 
travel at relativistic speeds (which might be an analogy for fast - 
batch - (re)processing). Besides the invariant speed, there is also 
another invariant - local clock (wall time) always ticks exactly at the 
rate of one second per second, no matter what. It is not possible to 
"move faster or slower" through (local) time.


In my understanding the reason why we do not put any guarantees or 
bounds on the delay of firing processing time timers is purely technical 
- the processing is (per key) single-threaded, thus any timer has to 
wait before any element processing finishes. This is only consequence of 
a technical solution, not something fundamental.


Having said that, my point is that according to the above analogy, it 
should be perfectly fine to fire processing time timers in batch based 
on (local wall) time only. There should be no way of manipulating this 
local time (excluding tests). Watermarks should be affected the same way 
as any buffering in a state that would happen in a stateful DoFn (i.e. 
set timer holds output watermark). We should probably pay attention to 
looping timers, but it seems possible to define a valid stopping 
condition (input watermark at infinity).


 Jan

On 2/22/24 19:50, Kenneth Knowles wrote:

Forking this thread.

The state of processing time timers in this mode of processing is not 
satisfactory and is discussed a lot but we should make everything 
explicit.


Currently, a state and timer DoFn has a number of logical watermarks: 
(apologies for fixed width not coming through in email lists). Treat 
timers as a back edge.


input --(A)(C)--> ParDo(DoFn) (D)---> output
            ^                      |
|--(B)-|
                           timers

(A) Input Element watermark: this is the watermark that promises there 
is no incoming element with a timestamp earlier than it. Each input 
element's timestamp holds this watermark. Note that *event time timers 
firing is according to this watermark*. But a runner commits changes 
to this watermark *whenever it wants*, in a way that can be 
consistent. So the runner can absolute process *all* the elements 
before advancing the watermark (A), and only afterwards start firing 
timers.


(B) Timer watermark: this is a watermark that promises no timer is set 
with an output timestamp earlier than it. Each timer that has an 
output timestamp holds this watermark. Note that timers can set new 
timers, indefinitely, so this may never reach infinity even in a drain 
scenario.


(C) (derived) total input watermark: this is a watermark that is the 
minimum of the two above, and ensures that all state for the DoFn for 
expired windows can be GCd after calling @OnWindowExpiration.


(D) output watermark: this is a promise that the DoFn will not output 
earlier than the watermark. It is held by the total input watermark.


So a any timer, processing or not, holds the total input watermark 
which prevents window GC, hence the timer must be fired. You can set 
timers without a timestamp and they will not hold (B) hence not hold 
the total input / GC watermark (C). Then if a timer fires for an 
expired window, it is ignored. But in general a timer that sets an 
output timestamp is saying that it may produce output, so it *must* be 
fired, even in batch, for data integrity. There was a time before 
timers had output timestamps that we said that you *always* have to 
have an @OnWindowExpiration callback for data integrity, and 
processing time timers could not hold the watermark. That is changed now.


One main purpose of processing time timers in streaming is to be a 
"timeout" for data buffered in state, to eventually flush. In this 
case the output timestamp should be the minimum of the elements in 
state (or equivalent). In batch, of course, this kind of timer is not 
relevant and we should definitely not wait for it, because the goal is 
to just get through all the data. We can justify this by saying that 
the worker really has no business having any idea what time it really 
is, and the runner can just run the clock at whatever speed it wants.


Another purpose, brought up on the Throttle thread, is to wait or 
backoff. In this case it would be desired for the timer to actually 
cause batch processing to pause 

Re: [DISCUSS] Processing time timers in "batch" (faster-than-wall-time [re]processing)

2024-02-22 Thread Robert Burke
This is a "timely" discussion because my next step for Prism is to address 
ProcessingTime.

The description of the watermarks matches my understanding and how it's 
implemented so far in Prism [0], where the "stage" contains one or more 
transforms to be executed by a worker.

My current thinking on processing time is in the issue tracker [1], largely 
focused on quite the opposite case than throttleling: for ensuring fast 
execution for pipelines with TestStream. As TestStream is for tests, and tests 
should execute quickly, there's no reason to do anything but synthetically 
advance the processing time. However, this only gates the Runner actions for 
processing time, not the Worker/SDK actions for processing time.

Of note during my explorations there was that there are two places 
ProcessingTime is invoked: a relatively scheduled resume for 
ProcessContinuations, and an absolute time for ProcessingTime timers. It's much 
easier to ignore a relative time, but absolute times are a bit harder, since 
it's never going to based on what the Runner time is, which will be skewed from 
SDK time, since there's no passing of Processing time from Runner to SDK.

I agree that the main purpose of ProcessingTime timers is to timeout state for 
"Streaming" execution, and similarly having OnWindowExpiration for guaranteeing 
any state is addressed for EventTime timer handling within a window. I also 
agree that a "Batch" execution shouldn't wait for ProcessingTime Timers,  but 
should still execute OnWindowExpirations. Notably, the existing behavior of a 
ProcessingTime timer is not to block execution, but to schedule potential 
execution. It would be wrong to block in otherwords.

Similarly, ProcessContinuations only declare a suggested resume time. It's 
still up to the DoFn returning the ProcessContinuation, assuming it's time 
dependant, to actually check the time for it's desired behavior. It's not a 
block, but an indication of when additional work might be available, and that 
it's probably a waste of time for the runner to schedule the work sooner than 
the recommended delay.

What's lacking is a Beam notion of Runner directed cross worker global state I 
think.

I don't know what that looks like exactly though in a way that would useful for 
more than simply a throttle. One could imagine a Special transform that is 
periodically executed on SDK workers in response to something and a Special 
SideInput that is how that information is propagated to other transforms (like 
the throttle transform). But that just sounds like a variant of Slowly Changing 
SideInputs, instead of allowing the Special transform to direct the runner's 
sharding and management of some other transforms. Hard to see how useful that 
is outside of the throttle though.

We could add a Block primitive, that does exactly that. Similar to timers, but 
execution SDK side is held until the Runner sends an Unblock signal for a given 
bundle instruction+blockID combo back to the SDK. But again that seems only 
useful for a central throttleing notion. Technically Google's internal Flume 
batch processor has the notion of a FlumeThrottle to solve exactly this problem.

I'd be happiest if we could figure out a less operationally specific primitive, 
but if not, a token bucket based BeamThrottle would be useful in batch and 
streaming, and shouldn't be too difficult to add to most runners and SDKs 
(though the amount of work will of course vary).

I've gotten away from the core topic. My opinion is "ProcessingTime Timers 
Shouldn't Block Execution" and "We should figure out the best central primitive 
to manage this class of concept".

Robert Burke
Beam Go Busybody

[0] 
https://github.com/apache/beam/blob/11f9bce485c4f6fe466ff4bf5073d2414e43678c/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L1253-L1331
[1] https://github.com/apache/beam/issues/30083


On 2024/02/22 18:50:10 Kenneth Knowles wrote:
> Forking this thread.
> 
> The state of processing time timers in this mode of processing is not
> satisfactory and is discussed a lot but we should make everything explicit.
> 
> Currently, a state and timer DoFn has a number of logical watermarks:
> (apologies for fixed width not coming through in email lists). Treat timers
> as a back edge.
> 
> input --(A)(C)--> ParDo(DoFn) (D)---> output
> ^  |
> |--(B)-|
>timers
> 
> 
> (A) Input Element watermark: this is the watermark that promises there is
> no incoming element with a timestamp earlier than it. Each input element's
> timestamp holds this watermark. Note that *event time timers firing is
> according to this watermark*. But a runner commits changes to this
> watermark *whenever it wants*, in a way that can be consistent. So the
> runner can absolute process *all* the elements before advancing the
> watermark (A), and only afterwards start firing timers.
> 
> (B) Timer watermark: this is