Re: Default output timestamp of processing-time timers
On Wed, Jan 19, 2022 at 1:11 AM Jan Lukavský wrote: > > One note - some people definitely use timer.withOutputTimestamp as a > watermark hold. > > Definitely. > > > In fact, I do not view a "watermark hold" as a fundamental concept. The > act of "set a timer with the intent that I am allowed to produce output > with timestamp X" is the fundamental concept, and watermark hold is an > implementation detail that should really never have been surfaced as an > end-user concept, or really even as an SDK author concept. > > Agree that this need not be exposed explicitly, but the given the > causality-preserving invariant that elements arriving *before* watermark > *must not* leave after watermark I think that .withOutputTimestamp actually > defines watermark hold implicitly. I think there is no other valid > implementation than to hold output watermark not to cross the output > timestamp of any active per-key timer (actually, we could distinguish cases > when the timer is set for already late elements, there is no need - or > possibility - to hold the watermark). > > I'd be also supportive for associating any buffer output timestamp with > timer, rather than the buffer itself, as that really feels like a better > description of what is *really* going to happen. > Is this just a way to connect the state, timer callback, and process element. I wonder how it looks different or what we could do better with this information. (I like these sorts of ideas, but I can't think of how it would be different) In the case Reuven described, where the timer callback does nothing, there seems to be a real risk that data is left behind in the buffer when the watermark hold is released. So you could, for example, have a timer callback that always must accept the full contents of the buffer, and where it is obvious to a user that the buffer is cleared after the callback. Like OnWindowExpiration but OnBufferEviction. > This was probably discussed, but I cannot see this in this discussion, > what keeps us from setting output timestamp of processing-time timer to > something like min(endOfWindow, currentOutputWatermark)? Yes, output > watermark is not stable, but anything that is derived from _processing > time_ is not stable by definition. For on-time elements, outputWatermark > gives an estimation of the current position in event-time, so it makes > sense to me to use that. Are there any counter examples? > This seems OK to me. Certainly the hold should never be based on processing time. Kenn > Jan > On 1/18/22 21:10, Kenneth Knowles wrote: > > Yea, it makes sense. This is an issue for the global window where there > isn't automatic cleanup of state. I've had a few user cases where they > would like a good way of doing state cleanup in the global window too - > something where whenever state gets buffer there is always a finite timer > that will fire. There might be an opportunity here, if we attach the hold > to that associated timer rather than the state. It sounds similar to what > you describe where someone made a timer just to create a watermark hold > associated with some state - I assume they actually do need to process and > emit that state in some way related to the timer. > > On Tue, Jan 18, 2022 at 9:35 AM Reuven Lax wrote: > >> Correct. >> >> IIRC originally we didn't want to add "buffered data timestamps" >> because it was error prone. Leaking even one record in state holds up the >> watermark and can cause the entire pipeline to grind to a halt. Associating >> with a timer guarantees that holds are always cleared eventually. >> >> On Tue, Jan 18, 2022 at 9:13 AM Kenneth Knowles wrote: >> >>> This is an interesting case, and a legitimate counterexample to >>> consider. I'd call it a workaround :-). The semantic thing they would >>> want/need is "output timestamp" associated with buffered data (also >>> implemented with watermark hold). I do know systems that designed their >>> state with this built in. >>> >>> Kenn >>> >>> On Tue, Jan 18, 2022 at 8:57 AM Reuven Lax wrote: >>> One note - some people definitely use timer.withOutputTimestamp as a watermark hold. >>> This is a scenario in which one outputs (from processElement) a timestamp behind the current input element timestamp but knows that it is safe because there is already an extent timer with an earlier output timestamp (state can be used for this). In this case I've seen timers set simply for the hold - the actual onTimer never outputs anything. Reuven On Tue, Jan 18, 2022 at 6:42 AM Kenneth Knowles wrote: > > > On Tue, Dec 14, 2021 at 2:38 PM Steve Niemitz > wrote: > >> > I think this wouldn't be very robust to different situations where >> processing time and event time may not be that close to each other. >> >> if you do something like `min(endOfWindow, max(eventInputTimestamp, >> computedFiringTimestamp))` the worst case is that you set a
Re: Default output timestamp of processing-time timers
> One note - some people definitely use timer.withOutputTimestamp as a watermark hold. Definitely. > In fact, I do not view a "watermark hold" as a fundamental concept. The act of "set a timer with the intent that I am allowed to produce output with timestamp X" is the fundamental concept, and watermark hold is an implementation detail that should really never have been surfaced as an end-user concept, or really even as an SDK author concept. Agree that this need not be exposed explicitly, but the given the causality-preserving invariant that elements arriving *before* watermark *must not* leave after watermark I think that .withOutputTimestamp actually defines watermark hold implicitly. I think there is no other valid implementation than to hold output watermark not to cross the output timestamp of any active per-key timer (actually, we could distinguish cases when the timer is set for already late elements, there is no need - or possibility - to hold the watermark). I'd be also supportive for associating any buffer output timestamp with timer, rather than the buffer itself, as that really feels like a better description of what is *really* going to happen. This was probably discussed, but I cannot see this in this discussion, what keeps us from setting output timestamp of processing-time timer to something like min(endOfWindow, currentOutputWatermark)? Yes, output watermark is not stable, but anything that is derived from _processing time_ is not stable by definition. For on-time elements, outputWatermark gives an estimation of the current position in event-time, so it makes sense to me to use that. Are there any counter examples? Jan On 1/18/22 21:10, Kenneth Knowles wrote: Yea, it makes sense. This is an issue for the global window where there isn't automatic cleanup of state. I've had a few user cases where they would like a good way of doing state cleanup in the global window too - something where whenever state gets buffer there is always a finite timer that will fire. There might be an opportunity here, if we attach the hold to that associated timer rather than the state. It sounds similar to what you describe where someone made a timer just to create a watermark hold associated with some state - I assume they actually do need to process and emit that state in some way related to the timer. On Tue, Jan 18, 2022 at 9:35 AM Reuven Lax wrote: Correct. IIRC originally we didn't want to add "buffered data timestamps" because it was error prone. Leaking even one record in state holds up the watermark and can cause the entire pipeline to grind to a halt. Associating with a timer guarantees that holds are always cleared eventually. On Tue, Jan 18, 2022 at 9:13 AM Kenneth Knowles wrote: This is an interesting case, and a legitimate counterexample to consider. I'd call it a workaround :-). The semantic thing they would want/need is "output timestamp" associated with buffered data (also implemented with watermark hold). I do know systems that designed their state with this built in. Kenn On Tue, Jan 18, 2022 at 8:57 AM Reuven Lax wrote: One note - some people definitely use timer.withOutputTimestamp as a watermark hold. This is a scenario in which one outputs (from processElement) a timestamp behind the current input element timestamp but knows that it is safe because there is already an extent timer with an earlier output timestamp (state can be used for this). In this case I've seen timers set simply for the hold - the actual onTimer never outputs anything. Reuven On Tue, Jan 18, 2022 at 6:42 AM Kenneth Knowles wrote: On Tue, Dec 14, 2021 at 2:38 PM Steve Niemitz wrote: > I think this wouldn't be very robust to different situations where processing time and event time may not be that close to each other. if you do something like `min(endOfWindow, max(eventInputTimestamp, computedFiringTimestamp))` the worst case is that you set a watermark hold for somewhere in the future, right? For example, if the watermark is lagging 3 hours, processing time = 4pm, event input = 1pm, window end = 5pm, the watermark hold/output time is set to 4pm + T. This would make the timestamps "newer" than the input, but shouldn't ever create late data, correct? Also, imo, the timestamps really already cross domains now, because the watermark (event time) is held until the (processing
Re: Default output timestamp of processing-time timers
Yea, it makes sense. This is an issue for the global window where there isn't automatic cleanup of state. I've had a few user cases where they would like a good way of doing state cleanup in the global window too - something where whenever state gets buffer there is always a finite timer that will fire. There might be an opportunity here, if we attach the hold to that associated timer rather than the state. It sounds similar to what you describe where someone made a timer just to create a watermark hold associated with some state - I assume they actually do need to process and emit that state in some way related to the timer. On Tue, Jan 18, 2022 at 9:35 AM Reuven Lax wrote: > Correct. > > IIRC originally we didn't want to add "buffered data timestamps" > because it was error prone. Leaking even one record in state holds up the > watermark and can cause the entire pipeline to grind to a halt. Associating > with a timer guarantees that holds are always cleared eventually. > > On Tue, Jan 18, 2022 at 9:13 AM Kenneth Knowles wrote: > >> This is an interesting case, and a legitimate counterexample to consider. >> I'd call it a workaround :-). The semantic thing they would want/need is >> "output timestamp" associated with buffered data (also implemented with >> watermark hold). I do know systems that designed their state with this >> built in. >> >> Kenn >> >> On Tue, Jan 18, 2022 at 8:57 AM Reuven Lax wrote: >> >>> One note - some people definitely use timer.withOutputTimestamp as a >>> watermark hold. >>> >> >>> This is a scenario in which one outputs (from processElement) a >>> timestamp behind the current input element timestamp but knows that it is >>> safe because there is already an extent timer with an earlier >>> output timestamp (state can be used for this). In this case I've seen >>> timers set simply for the hold - the actual onTimer never outputs anything. >>> >>> Reuven >>> >>> On Tue, Jan 18, 2022 at 6:42 AM Kenneth Knowles wrote: >>> On Tue, Dec 14, 2021 at 2:38 PM Steve Niemitz wrote: > > I think this wouldn't be very robust to different situations where > processing time and event time may not be that close to each other. > > if you do something like `min(endOfWindow, max(eventInputTimestamp, > computedFiringTimestamp))` the worst case is that you set a watermark hold > for somewhere in the future, right? For example, if the watermark is > lagging 3 hours, processing time = 4pm, event input = 1pm, window end = > 5pm, the watermark hold/output time is set to 4pm + T. This would make > the > timestamps "newer" than the input, but shouldn't ever create late data, > correct? > > Also, imo, the timestamps really already cross domains now, because > the watermark (event time) is held until the (processing time) timer > fires. > > The concrete issue that brought this up was a pipeline with some > state, and the state was "cleaned up" periodically with a processing time > timer that fired every ~hour. The author of the pipeline was confused why > the watermark wasn't moving (and thus GBKs firing, etc). The root cause > was the watermark being held by the timer. > > > It would just save you .withOutputTimestamp(elementTimestamp) on > your calls to setting the event time timer, right? > > Correct, the main thing I'm trying to solve is having to recalculate > an output timestamp using the same logic that the timer itself is using to > set its firing timestamp. > It sounds like the main use case that you are dealing with is the case where the timer doesn't actually produce output (or set further timers that produce output) so it doesn't need (or want) a watermark hold. That makes sense. In fact, I do not view a "watermark hold" as a fundamental concept. The act of "set a timer with the intent that I am allowed to produce output with timestamp X" is the fundamental concept, and watermark hold is an implementation detail that should really never have been surfaced as an end-user concept, or really even as an SDK author concept. This is why in my proposal for adding output timestamps to timers, I called it "withOutputTimestamp", and this is why the design does not include any watermark holds - there is a self-loop on a transform where timers produce an input watermark distinct from the watermark on input elements, and that is enough. There is not now, and never has been, a need for the concept of a hold at the level of the Beam model. I wonder if we can automate this behavior by noticing that there is no OutputReceiver parameters to the timer callback, and also transitively. Or just work around it by saying ".withoutOutput" on the timer. Kenn > > > > On Tue, Dec 14, 2021 at 4:10 PM Kenneth Knowles > wrote: >
Re: Default output timestamp of processing-time timers
This is an interesting case, and a legitimate counterexample to consider. I'd call it a workaround :-). The semantic thing they would want/need is "output timestamp" associated with buffered data (also implemented with watermark hold). I do know systems that designed their state with this built in. Kenn On Tue, Jan 18, 2022 at 8:57 AM Reuven Lax wrote: > One note - some people definitely use timer.withOutputTimestamp as a > watermark hold. > > This is a scenario in which one outputs (from processElement) a timestamp > behind the current input element timestamp but knows that it is safe > because there is already an extent timer with an earlier output timestamp > (state can be used for this). In this case I've seen timers set simply for > the hold - the actual onTimer never outputs anything. > > Reuven > > On Tue, Jan 18, 2022 at 6:42 AM Kenneth Knowles wrote: > >> >> >> On Tue, Dec 14, 2021 at 2:38 PM Steve Niemitz >> wrote: >> >>> > I think this wouldn't be very robust to different situations where >>> processing time and event time may not be that close to each other. >>> >>> if you do something like `min(endOfWindow, max(eventInputTimestamp, >>> computedFiringTimestamp))` the worst case is that you set a watermark hold >>> for somewhere in the future, right? For example, if the watermark is >>> lagging 3 hours, processing time = 4pm, event input = 1pm, window end = >>> 5pm, the watermark hold/output time is set to 4pm + T. This would make the >>> timestamps "newer" than the input, but shouldn't ever create late data, >>> correct? >>> >>> Also, imo, the timestamps really already cross domains now, because the >>> watermark (event time) is held until the (processing time) timer fires. >>> >>> The concrete issue that brought this up was a pipeline with some state, >>> and the state was "cleaned up" periodically with a processing time timer >>> that fired every ~hour. The author of the pipeline was confused why the >>> watermark wasn't moving (and thus GBKs firing, etc). The root cause was >>> the watermark being held by the timer. >>> >>> > It would just save you .withOutputTimestamp(elementTimestamp) on your >>> calls to setting the event time timer, right? >>> >>> Correct, the main thing I'm trying to solve is having to recalculate an >>> output timestamp using the same logic that the timer itself is using to set >>> its firing timestamp. >>> >> >> It sounds like the main use case that you are dealing with is the case >> where the timer doesn't actually produce output (or set further timers that >> produce output) so it doesn't need (or want) a watermark hold. That makes >> sense. >> >> In fact, I do not view a "watermark hold" as a fundamental concept. The >> act of "set a timer with the intent that I am allowed to produce output >> with timestamp X" is the fundamental concept, and watermark hold is an >> implementation detail that should really never have been surfaced as an >> end-user concept, or really even as an SDK author concept. This is why in >> my proposal for adding output timestamps to timers, I called it >> "withOutputTimestamp", and this is why the design does not include any >> watermark holds - there is a self-loop on a transform where timers produce >> an input watermark distinct from the watermark on input elements, and that >> is enough. There is not now, and never has been, a need for the concept of >> a hold at the level of the Beam model. >> >> I wonder if we can automate this behavior by noticing that there is no >> OutputReceiver parameters to the timer callback, and also transitively. Or >> just work around it by saying ".withoutOutput" on the timer. >> >> Kenn >> >> >>> >>> >>> >>> On Tue, Dec 14, 2021 at 4:10 PM Kenneth Knowles wrote: >>> On Tue, Dec 7, 2021 at 7:27 AM Steve Niemitz wrote: > If I have a processing time timer, is there any way to automatically > set the output timestamp to the timer firing timestamp (similar to how > event-time timers work). > > A common use case would be to do something like: > timer.offset(X).align(Y).setRelative() > but have the output timestamp be the firing timestamp. In order to do > this now you need to re-calculate the output timestamp (using the same > logic as the timer does internally) and manually use withOutputTimestamp. I think this wouldn't be very robust to different situations where processing time and event time may not be that close to each other. In general I'm skeptical of reusing timestamps across time domains, for just this sort of reason. I wouldn't recommend doing this manually either. > I'm not sure what the API would look like here, but it would also be > nice to allow event-time timers to do the same in reverse (use the element > input timestamp rather than the firing timestamp). Maybe something like > `withDefaultOutputTimestampFrom(...)` and an enum of FIRING_TIMESTAMP,
Re: Default output timestamp of processing-time timers
On Tue, Dec 14, 2021 at 2:38 PM Steve Niemitz wrote: > > I think this wouldn't be very robust to different situations where > processing time and event time may not be that close to each other. > > if you do something like `min(endOfWindow, max(eventInputTimestamp, > computedFiringTimestamp))` the worst case is that you set a watermark hold > for somewhere in the future, right? For example, if the watermark is > lagging 3 hours, processing time = 4pm, event input = 1pm, window end = > 5pm, the watermark hold/output time is set to 4pm + T. This would make the > timestamps "newer" than the input, but shouldn't ever create late data, > correct? > > Also, imo, the timestamps really already cross domains now, because the > watermark (event time) is held until the (processing time) timer fires. > > The concrete issue that brought this up was a pipeline with some state, > and the state was "cleaned up" periodically with a processing time timer > that fired every ~hour. The author of the pipeline was confused why the > watermark wasn't moving (and thus GBKs firing, etc). The root cause was > the watermark being held by the timer. > > > It would just save you .withOutputTimestamp(elementTimestamp) on your > calls to setting the event time timer, right? > > Correct, the main thing I'm trying to solve is having to recalculate an > output timestamp using the same logic that the timer itself is using to set > its firing timestamp. > It sounds like the main use case that you are dealing with is the case where the timer doesn't actually produce output (or set further timers that produce output) so it doesn't need (or want) a watermark hold. That makes sense. In fact, I do not view a "watermark hold" as a fundamental concept. The act of "set a timer with the intent that I am allowed to produce output with timestamp X" is the fundamental concept, and watermark hold is an implementation detail that should really never have been surfaced as an end-user concept, or really even as an SDK author concept. This is why in my proposal for adding output timestamps to timers, I called it "withOutputTimestamp", and this is why the design does not include any watermark holds - there is a self-loop on a transform where timers produce an input watermark distinct from the watermark on input elements, and that is enough. There is not now, and never has been, a need for the concept of a hold at the level of the Beam model. I wonder if we can automate this behavior by noticing that there is no OutputReceiver parameters to the timer callback, and also transitively. Or just work around it by saying ".withoutOutput" on the timer. Kenn > > > > On Tue, Dec 14, 2021 at 4:10 PM Kenneth Knowles wrote: > >> >> >> On Tue, Dec 7, 2021 at 7:27 AM Steve Niemitz wrote: >> >>> If I have a processing time timer, is there any way to automatically set >>> the output timestamp to the timer firing timestamp (similar to how >>> event-time timers work). >>> >>> A common use case would be to do something like: >>> timer.offset(X).align(Y).setRelative() >>> >> >> >> but have the output timestamp be the firing timestamp. In order to do >>> this now you need to re-calculate the output timestamp (using the same >>> logic as the timer does internally) and manually use withOutputTimestamp. >> >> >> I think this wouldn't be very robust to different situations where >> processing time and event time may not be that close to each other. In >> general I'm skeptical of reusing timestamps across time domains, for just >> this sort of reason. I wouldn't recommend doing this manually either. >> >> >>> I'm not sure what the API would look like here, but it would also be >>> nice to allow event-time timers to do the same in reverse (use the element >>> input timestamp rather than the firing timestamp). Maybe something like >>> `withDefaultOutputTimestampFrom(...)` and an enum of FIRING_TIMESTAMP, >>> ELEMENT_TIMESTAMP? >>> >> >> It would just save you .withOutputTimestamp(elementTimestamp) on your >> calls to setting the event time timer, right? It doesn't work in general >> because a timer can be set from other OnTimer methods, where there is no >> "element" per se, but just the output timestamp of the fired timer. >> >> Kenn >> >
Re: Default output timestamp of processing-time timers
> I think this wouldn't be very robust to different situations where processing time and event time may not be that close to each other. if you do something like `min(endOfWindow, max(eventInputTimestamp, computedFiringTimestamp))` the worst case is that you set a watermark hold for somewhere in the future, right? For example, if the watermark is lagging 3 hours, processing time = 4pm, event input = 1pm, window end = 5pm, the watermark hold/output time is set to 4pm + T. This would make the timestamps "newer" than the input, but shouldn't ever create late data, correct? Also, imo, the timestamps really already cross domains now, because the watermark (event time) is held until the (processing time) timer fires. The concrete issue that brought this up was a pipeline with some state, and the state was "cleaned up" periodically with a processing time timer that fired every ~hour. The author of the pipeline was confused why the watermark wasn't moving (and thus GBKs firing, etc). The root cause was the watermark being held by the timer. > It would just save you .withOutputTimestamp(elementTimestamp) on your calls to setting the event time timer, right? Correct, the main thing I'm trying to solve is having to recalculate an output timestamp using the same logic that the timer itself is using to set its firing timestamp. On Tue, Dec 14, 2021 at 4:10 PM Kenneth Knowles wrote: > > > On Tue, Dec 7, 2021 at 7:27 AM Steve Niemitz wrote: > >> If I have a processing time timer, is there any way to automatically set >> the output timestamp to the timer firing timestamp (similar to how >> event-time timers work). >> >> A common use case would be to do something like: >> timer.offset(X).align(Y).setRelative() >> > > > but have the output timestamp be the firing timestamp. In order to do >> this now you need to re-calculate the output timestamp (using the same >> logic as the timer does internally) and manually use withOutputTimestamp. > > > I think this wouldn't be very robust to different situations where > processing time and event time may not be that close to each other. In > general I'm skeptical of reusing timestamps across time domains, for just > this sort of reason. I wouldn't recommend doing this manually either. > > >> I'm not sure what the API would look like here, but it would also be nice >> to allow event-time timers to do the same in reverse (use the element >> input timestamp rather than the firing timestamp). Maybe something like >> `withDefaultOutputTimestampFrom(...)` and an enum of FIRING_TIMESTAMP, >> ELEMENT_TIMESTAMP? >> > > It would just save you .withOutputTimestamp(elementTimestamp) on your > calls to setting the event time timer, right? It doesn't work in general > because a timer can be set from other OnTimer methods, where there is no > "element" per se, but just the output timestamp of the fired timer. > > Kenn >
Re: Default output timestamp of processing-time timers
On Tue, Dec 7, 2021 at 7:27 AM Steve Niemitz wrote: > If I have a processing time timer, is there any way to automatically set > the output timestamp to the timer firing timestamp (similar to how > event-time timers work). > > A common use case would be to do something like: > timer.offset(X).align(Y).setRelative() > but have the output timestamp be the firing timestamp. In order to do this > now you need to re-calculate the output timestamp (using the same logic as > the timer does internally) and manually use withOutputTimestamp. I think this wouldn't be very robust to different situations where processing time and event time may not be that close to each other. In general I'm skeptical of reusing timestamps across time domains, for just this sort of reason. I wouldn't recommend doing this manually either. > I'm not sure what the API would look like here, but it would also be nice > to allow event-time timers to do the same in reverse (use the element > input timestamp rather than the firing timestamp). Maybe something like > `withDefaultOutputTimestampFrom(...)` and an enum of FIRING_TIMESTAMP, > ELEMENT_TIMESTAMP? > It would just save you .withOutputTimestamp(elementTimestamp) on your calls to setting the event time timer, right? It doesn't work in general because a timer can be set from other OnTimer methods, where there is no "element" per se, but just the output timestamp of the fired timer. Kenn
Default output timestamp of processing-time timers
If I have a processing time timer, is there any way to automatically set the output timestamp to the timer firing timestamp (similar to how event-time timers work). A common use case would be to do something like: timer.offset(X).align(Y).setRelative() but have the output timestamp be the firing timestamp. In order to do this now you need to re-calculate the output timestamp (using the same logic as the timer does internally) and manually use withOutputTimestamp. I'm not sure what the API would look like here, but it would also be nice to allow event-time timers to do the same in reverse (use the element input timestamp rather than the firing timestamp). Maybe something like `withDefaultOutputTimestampFrom(...)` and an enum of FIRING_TIMESTAMP, ELEMENT_TIMESTAMP?