[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=384058=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384058 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 09/Feb/20 07:55 Start Date: 09/Feb/20 07:55 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384058) Time Spent: 25h 10m (was: 25h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 25h 10m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=384054=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384054 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 09/Feb/20 06:57 Start Date: 09/Feb/20 06:57 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #10804: [BEAM-2535] Fix timer map URL: https://github.com/apache/beam/pull/10804 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384054) Time Spent: 25h (was: 24h 50m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 25h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=384053=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384053 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 09/Feb/20 06:56 Start Date: 09/Feb/20 06:56 Worklog Time Spent: 10m Work Description: reuvenlax commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-583811815 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384053) Time Spent: 24h 50m (was: 24h 40m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 24h 50m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=384050=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384050 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 09/Feb/20 05:35 Start Date: 09/Feb/20 05:35 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on issue #10804: [BEAM-2535] Fix timer map URL: https://github.com/apache/beam/pull/10804#issuecomment-583806857 lgtm This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384050) Time Spent: 24h 40m (was: 24.5h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 24h 40m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=384037=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384037 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 09/Feb/20 03:31 Start Date: 09/Feb/20 03:31 Worklog Time Spent: 10m Work Description: reuvenlax commented on issue #10804: [BEAM-2535] Fix timer map URL: https://github.com/apache/beam/pull/10804#issuecomment-583800951 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384037) Time Spent: 24h 20m (was: 24h 10m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 24h 20m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=384038=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384038 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 09/Feb/20 03:31 Start Date: 09/Feb/20 03:31 Worklog Time Spent: 10m Work Description: reuvenlax commented on issue #10804: [BEAM-2535] Fix timer map URL: https://github.com/apache/beam/pull/10804#issuecomment-583800979 @rehmanmuradali let me know if these changes look good to you This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 384038) Time Spent: 24.5h (was: 24h 20m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 24.5h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=383762=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-383762 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 07/Feb/20 18:09 Start Date: 07/Feb/20 18:09 Worklog Time Spent: 10m Work Description: aaltay commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-583534120 Pinging to trigger tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 383762) Time Spent: 24h 10m (was: 24h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 24h 10m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=383408=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-383408 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 07/Feb/20 06:58 Start Date: 07/Feb/20 06:58 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r376238909 ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ## @@ -3806,6 +3812,90 @@ public void onTimer( pipeline.run(); } +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesTimersInParDo.class, + UsesTestStreamWithProcessingTime.class, + UsesTestStreamWithOutputTimestamp.class +}) +public void testOutputTimestampWithProcessingTime() { + final String timerId = "foo"; + DoFn, KV> fn1 = + new DoFn, KV>() { + +@TimerId(timerId) +private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + +@ProcessElement +public void processElement( +@TimerId(timerId) Timer timer, +@Timestamp Instant timestamp, +OutputReceiver> o) { + timer + .withOutputTimestamp(timestamp.plus(Duration.standardSeconds(5))) + .offset(Duration.standardSeconds(10)) + .setRelative(); + // Output a message. This will cause the next DoFn to set a timer as well. + o.output(KV.of("foo", 100)); +} + +@OnTimer(timerId) +public void onTimer(OnTimerContext c, BoundedWindow w) {} + }; + + DoFn, Integer> fn2 = + new DoFn, Integer>() { + +@TimerId(timerId) +private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + +@StateId("timerFired") +final StateSpec> timerFiredState = StateSpecs.value(); + +@ProcessElement +public void processElement( +@TimerId(timerId) Timer timer, +@StateId("timerFired") ValueState timerFiredState) { + Boolean timerFired = timerFiredState.read(); + assertTrue(timerFired == null || !timerFired); + // Set a timer to 8. This is earlier than the previous DoFn's timer, but after the + // previous + // DoFn timer's watermark hold. This timer should not fire until the previous timer + // fires and removes + // the watermark hold. + timer.offset(Duration.standardSeconds(8)).setRelative(); +} + +@OnTimer(timerId) +public void onTimer( +@StateId("timerFired") ValueState timerFiredState, +OutputReceiver o) { + timerFiredState.write(true); + o.output(100); +} + }; + + TestStream> stream = + TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) + .advanceProcessingTime(Duration.standardSeconds(1)) + // Cause fn2 to set a timer. + .addElements(KV.of("key", 1)) + // Normally this would case fn2's timer to expire, but it shouldn't here because of + // the output timestamp. + .advanceProcessingTime(Duration.standardSeconds(9)) Review comment: @reuvenlax done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 383408) Time Spent: 24h (was: 23h 50m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 24h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the >
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=383282=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-383282 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 07/Feb/20 00:15 Start Date: 07/Feb/20 00:15 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r376153594 ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ## @@ -3806,6 +3812,90 @@ public void onTimer( pipeline.run(); } +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesTimersInParDo.class, + UsesTestStreamWithProcessingTime.class, + UsesTestStreamWithOutputTimestamp.class +}) +public void testOutputTimestampWithProcessingTime() { + final String timerId = "foo"; + DoFn, KV> fn1 = + new DoFn, KV>() { + +@TimerId(timerId) +private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + +@ProcessElement +public void processElement( +@TimerId(timerId) Timer timer, +@Timestamp Instant timestamp, +OutputReceiver> o) { + timer + .withOutputTimestamp(timestamp.plus(Duration.standardSeconds(5))) + .offset(Duration.standardSeconds(10)) + .setRelative(); + // Output a message. This will cause the next DoFn to set a timer as well. + o.output(KV.of("foo", 100)); +} + +@OnTimer(timerId) +public void onTimer(OnTimerContext c, BoundedWindow w) {} + }; + + DoFn, Integer> fn2 = + new DoFn, Integer>() { + +@TimerId(timerId) +private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + +@StateId("timerFired") +final StateSpec> timerFiredState = StateSpecs.value(); + +@ProcessElement +public void processElement( +@TimerId(timerId) Timer timer, +@StateId("timerFired") ValueState timerFiredState) { + Boolean timerFired = timerFiredState.read(); + assertTrue(timerFired == null || !timerFired); + // Set a timer to 8. This is earlier than the previous DoFn's timer, but after the + // previous + // DoFn timer's watermark hold. This timer should not fire until the previous timer + // fires and removes + // the watermark hold. + timer.offset(Duration.standardSeconds(8)).setRelative(); +} + +@OnTimer(timerId) +public void onTimer( +@StateId("timerFired") ValueState timerFiredState, +OutputReceiver o) { + timerFiredState.write(true); + o.output(100); +} + }; + + TestStream> stream = + TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) + .advanceProcessingTime(Duration.standardSeconds(1)) + // Cause fn2 to set a timer. + .addElements(KV.of("key", 1)) + // Normally this would case fn2's timer to expire, but it shouldn't here because of + // the output timestamp. + .advanceProcessingTime(Duration.standardSeconds(9)) Review comment: yes, but you need to advance the watermark before calling addElements. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 383282) Time Spent: 23h 50m (was: 23h 40m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 23h 50m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=383142=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-383142 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 06/Feb/20 20:41 Start Date: 06/Feb/20 20:41 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r376070436 ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ## @@ -3806,6 +3812,90 @@ public void onTimer( pipeline.run(); } +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesTimersInParDo.class, + UsesTestStreamWithProcessingTime.class, + UsesTestStreamWithOutputTimestamp.class +}) +public void testOutputTimestampWithProcessingTime() { + final String timerId = "foo"; + DoFn, KV> fn1 = + new DoFn, KV>() { + +@TimerId(timerId) +private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + +@ProcessElement +public void processElement( +@TimerId(timerId) Timer timer, +@Timestamp Instant timestamp, +OutputReceiver> o) { + timer + .withOutputTimestamp(timestamp.plus(Duration.standardSeconds(5))) + .offset(Duration.standardSeconds(10)) + .setRelative(); + // Output a message. This will cause the next DoFn to set a timer as well. + o.output(KV.of("foo", 100)); +} + +@OnTimer(timerId) +public void onTimer(OnTimerContext c, BoundedWindow w) {} + }; + + DoFn, Integer> fn2 = + new DoFn, Integer>() { + +@TimerId(timerId) +private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + +@StateId("timerFired") +final StateSpec> timerFiredState = StateSpecs.value(); + +@ProcessElement +public void processElement( +@TimerId(timerId) Timer timer, +@StateId("timerFired") ValueState timerFiredState) { + Boolean timerFired = timerFiredState.read(); + assertTrue(timerFired == null || !timerFired); + // Set a timer to 8. This is earlier than the previous DoFn's timer, but after the + // previous + // DoFn timer's watermark hold. This timer should not fire until the previous timer + // fires and removes + // the watermark hold. + timer.offset(Duration.standardSeconds(8)).setRelative(); +} + +@OnTimer(timerId) +public void onTimer( +@StateId("timerFired") ValueState timerFiredState, +OutputReceiver o) { + timerFiredState.write(true); + o.output(100); +} + }; + + TestStream> stream = + TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) + .advanceProcessingTime(Duration.standardSeconds(1)) + // Cause fn2 to set a timer. + .addElements(KV.of("key", 1)) + // Normally this would case fn2's timer to expire, but it shouldn't here because of + // the output timestamp. + .advanceProcessingTime(Duration.standardSeconds(9)) Review comment: @reuvenlax, I think if we advance processing time to 11, it will fire fn1's timer as we have set delivery time offset as 10. If we advance processing time by 9 and watermark by 11 then it will test the functionality? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 383142) Time Spent: 23h 40m (was: 23.5h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 23h 40m > Remaining Estimate: 0h > > Today, we have insufficient control over the event
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=383137=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-383137 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 06/Feb/20 20:37 Start Date: 06/Feb/20 20:37 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r376068437 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ## @@ -597,20 +597,29 @@ public synchronized Instant getEarliestTimerTimestamp() { Instant earliest = THE_END_OF_TIME.get(); for (NavigableSet timers : processingTimers.values()) { Review comment: @reuvenlax updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 383137) Time Spent: 23h 10m (was: 23h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 23h 10m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=383138=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-383138 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 06/Feb/20 20:37 Start Date: 06/Feb/20 20:37 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r376068437 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ## @@ -597,20 +597,29 @@ public synchronized Instant getEarliestTimerTimestamp() { Instant earliest = THE_END_OF_TIME.get(); for (NavigableSet timers : processingTimers.values()) { Review comment: @reuvenlax done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 383138) Time Spent: 23h 20m (was: 23h 10m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 23h 20m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=383139=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-383139 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 06/Feb/20 20:37 Start Date: 06/Feb/20 20:37 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r376068658 ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ## @@ -3806,6 +3812,90 @@ public void onTimer( pipeline.run(); } +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesTimersInParDo.class, + UsesTestStreamWithProcessingTime.class, + UsesTestStreamWithOutputTimestamp.class +}) +public void testOutputTimestampWithProcessingTime() { + final String timerId = "foo"; + DoFn, KV> fn1 = + new DoFn, KV>() { + +@TimerId(timerId) +private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + +@ProcessElement +public void processElement( +@TimerId(timerId) Timer timer, +@Timestamp Instant timestamp, +OutputReceiver> o) { + timer + .withOutputTimestamp(timestamp.plus(Duration.standardSeconds(5))) + .offset(Duration.standardSeconds(10)) + .setRelative(); + // Output a message. This will cause the next DoFn to set a timer as well. + o.output(KV.of("foo", 100)); +} + +@OnTimer(timerId) +public void onTimer(OnTimerContext c, BoundedWindow w) {} + }; + + DoFn, Integer> fn2 = + new DoFn, Integer>() { + +@TimerId(timerId) +private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + +@StateId("timerFired") +final StateSpec> timerFiredState = StateSpecs.value(); + +@ProcessElement +public void processElement( +@TimerId(timerId) Timer timer, +@StateId("timerFired") ValueState timerFiredState) { + Boolean timerFired = timerFiredState.read(); + assertTrue(timerFired == null || !timerFired); + // Set a timer to 8. This is earlier than the previous DoFn's timer, but after the + // previous + // DoFn timer's watermark hold. This timer should not fire until the previous timer + // fires and removes + // the watermark hold. + timer.offset(Duration.standardSeconds(8)).setRelative(); Review comment: @reuvenlax done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 383139) Time Spent: 23.5h (was: 23h 20m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 23.5h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=382581=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-382581 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 05/Feb/20 22:43 Start Date: 05/Feb/20 22:43 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r375546026 ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ## @@ -3806,6 +3812,90 @@ public void onTimer( pipeline.run(); } +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesTimersInParDo.class, + UsesTestStreamWithProcessingTime.class, + UsesTestStreamWithOutputTimestamp.class +}) +public void testOutputTimestampWithProcessingTime() { + final String timerId = "foo"; + DoFn, KV> fn1 = + new DoFn, KV>() { + +@TimerId(timerId) +private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + +@ProcessElement +public void processElement( +@TimerId(timerId) Timer timer, +@Timestamp Instant timestamp, +OutputReceiver> o) { + timer + .withOutputTimestamp(timestamp.plus(Duration.standardSeconds(5))) + .offset(Duration.standardSeconds(10)) + .setRelative(); + // Output a message. This will cause the next DoFn to set a timer as well. + o.output(KV.of("foo", 100)); +} + +@OnTimer(timerId) +public void onTimer(OnTimerContext c, BoundedWindow w) {} + }; + + DoFn, Integer> fn2 = + new DoFn, Integer>() { + +@TimerId(timerId) +private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + +@StateId("timerFired") +final StateSpec> timerFiredState = StateSpecs.value(); + +@ProcessElement +public void processElement( +@TimerId(timerId) Timer timer, +@StateId("timerFired") ValueState timerFiredState) { + Boolean timerFired = timerFiredState.read(); + assertTrue(timerFired == null || !timerFired); + // Set a timer to 8. This is earlier than the previous DoFn's timer, but after the + // previous + // DoFn timer's watermark hold. This timer should not fire until the previous timer + // fires and removes + // the watermark hold. + timer.offset(Duration.standardSeconds(8)).setRelative(); Review comment: just use timer.set, not timer.offset This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 382581) Time Spent: 22h 50m (was: 22h 40m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 22h 50m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=382583=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-382583 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 05/Feb/20 22:43 Start Date: 05/Feb/20 22:43 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r375549630 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ## @@ -597,20 +597,29 @@ public synchronized Instant getEarliestTimerTimestamp() { Instant earliest = THE_END_OF_TIME.get(); for (NavigableSet timers : processingTimers.values()) { Review comment: What about the other getEarliestTimerTimestamp function? Does that need to be updated? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 382583) Time Spent: 23h (was: 22h 50m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 23h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=382582=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-382582 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 05/Feb/20 22:43 Start Date: 05/Feb/20 22:43 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r375548830 ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ## @@ -3806,6 +3812,90 @@ public void onTimer( pipeline.run(); } +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesTimersInParDo.class, + UsesTestStreamWithProcessingTime.class, + UsesTestStreamWithOutputTimestamp.class +}) +public void testOutputTimestampWithProcessingTime() { + final String timerId = "foo"; + DoFn, KV> fn1 = + new DoFn, KV>() { + +@TimerId(timerId) +private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + +@ProcessElement +public void processElement( +@TimerId(timerId) Timer timer, +@Timestamp Instant timestamp, +OutputReceiver> o) { + timer + .withOutputTimestamp(timestamp.plus(Duration.standardSeconds(5))) + .offset(Duration.standardSeconds(10)) + .setRelative(); + // Output a message. This will cause the next DoFn to set a timer as well. + o.output(KV.of("foo", 100)); +} + +@OnTimer(timerId) +public void onTimer(OnTimerContext c, BoundedWindow w) {} + }; + + DoFn, Integer> fn2 = + new DoFn, Integer>() { + +@TimerId(timerId) +private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + +@StateId("timerFired") +final StateSpec> timerFiredState = StateSpecs.value(); + +@ProcessElement +public void processElement( +@TimerId(timerId) Timer timer, +@StateId("timerFired") ValueState timerFiredState) { + Boolean timerFired = timerFiredState.read(); + assertTrue(timerFired == null || !timerFired); + // Set a timer to 8. This is earlier than the previous DoFn's timer, but after the + // previous + // DoFn timer's watermark hold. This timer should not fire until the previous timer + // fires and removes + // the watermark hold. + timer.offset(Duration.standardSeconds(8)).setRelative(); +} + +@OnTimer(timerId) +public void onTimer( +@StateId("timerFired") ValueState timerFiredState, +OutputReceiver o) { + timerFiredState.write(true); + o.output(100); +} + }; + + TestStream> stream = + TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) + .advanceProcessingTime(Duration.standardSeconds(1)) + // Cause fn2 to set a timer. + .addElements(KV.of("key", 1)) + // Normally this would case fn2's timer to expire, but it shouldn't here because of + // the output timestamp. + .advanceProcessingTime(Duration.standardSeconds(9)) Review comment: I think you might have to advance processing time to at least 11. You also need to advance the watermark to at least 10 to allow the timer to fire. Right now this isn't testing anything, because the input watermark is preventing the timer from firing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 382582) Time Spent: 23h (was: 22h 50m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 23h > Remaining Estimate: 0h > > Today, we have
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=382058=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-382058 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 05/Feb/20 04:07 Start Date: 05/Feb/20 04:07 Worklog Time Spent: 10m Work Description: kennknowles commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-582231164 run flink validatesrunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 382058) Time Spent: 22h 40m (was: 22.5h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 22h 40m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=382057=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-382057 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 05/Feb/20 04:06 Start Date: 05/Feb/20 04:06 Worklog Time Spent: 10m Work Description: kennknowles commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-582231125 run dataflow validatesrunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 382057) Time Spent: 22.5h (was: 22h 20m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 22.5h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=380566=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380566 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 02/Feb/20 18:16 Start Date: 02/Feb/20 18:16 Worklog Time Spent: 10m Work Description: reuvenlax commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-581161901 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 380566) Time Spent: 22h 20m (was: 22h 10m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 22h 20m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=380535=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380535 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 02/Feb/20 14:16 Start Date: 02/Feb/20 14:16 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-581139810 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 380535) Time Spent: 22h (was: 21h 50m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 22h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=380536=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380536 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 02/Feb/20 14:16 Start Date: 02/Feb/20 14:16 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-581139741 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 380536) Time Spent: 22h 10m (was: 22h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 22h 10m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=380534=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380534 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 02/Feb/20 14:15 Start Date: 02/Feb/20 14:15 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-581139741 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 380534) Time Spent: 21h 50m (was: 21h 40m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 21h 50m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=380450=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380450 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 02/Feb/20 07:48 Start Date: 02/Feb/20 07:48 Worklog Time Spent: 10m Work Description: reuvenlax commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-581108218 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 380450) Time Spent: 21h 40m (was: 21.5h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 21h 40m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=380448=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380448 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 02/Feb/20 07:43 Start Date: 02/Feb/20 07:43 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373826099 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ## @@ -1092,24 +1107,39 @@ private void verifyAbsoluteTimeDomain() { * */ private void setAndVerifyOutputTimestamp() { - // Output timestamp is currently not supported in processing time timers. - if (outputTimestamp != null && !TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { -throw new IllegalStateException("Cannot set outputTimestamp in processing time domain."); - } + // Output timestamp is set to the delivery time if not initialized by an user. - if (outputTimestamp == null) { + if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { outputTimestamp = target; } - - if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { -Instant windowExpiry = window.maxTimestamp().plus(allowedLateness); -checkArgument( -!target.isAfter(windowExpiry), -"Attempted to set event time timer that outputs for %s but that is" -+ " after the expiration of window %s", -target, -windowExpiry); + // For processing timers + if (outputTimestamp == null) { Review comment: @reuvenlax we only set outputTimestamp here. I think If that is already set before then we don't need anything. I think else clause is not required here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 380448) Time Spent: 21.5h (was: 21h 20m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 21.5h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For >
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=380446=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380446 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 02/Feb/20 07:41 Start Date: 02/Feb/20 07:41 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373825991 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ## @@ -1092,24 +1107,39 @@ private void verifyAbsoluteTimeDomain() { * */ private void setAndVerifyOutputTimestamp() { - // Output timestamp is currently not supported in processing time timers. - if (outputTimestamp != null && !TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { -throw new IllegalStateException("Cannot set outputTimestamp in processing time domain."); - } + // Output timestamp is set to the delivery time if not initialized by an user. - if (outputTimestamp == null) { + if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { outputTimestamp = target; } - - if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { -Instant windowExpiry = window.maxTimestamp().plus(allowedLateness); -checkArgument( -!target.isAfter(windowExpiry), -"Attempted to set event time timer that outputs for %s but that is" -+ " after the expiration of window %s", -target, -windowExpiry); + // For processing timers + if (outputTimestamp == null) { +// For processing timers output timestamp will be: +// 1) timestamp of input element +// OR +// 2) output timestamp of firing timer. +outputTimestamp = elementInputTimestamp; } + + checkArgument( + !outputTimestamp.isBefore(elementInputTimestamp), + "output timestamp %s should be after input message timestamp or output timestamp of firing timers %s", + outputTimestamp, + elementInputTimestamp); Review comment: @reuvenlax done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 380446) Time Spent: 21h 20m (was: 21h 10m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 21h 20m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=380445=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380445 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 02/Feb/20 07:41 Start Date: 02/Feb/20 07:41 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373825967 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ## @@ -987,6 +997,7 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { private final TimerSpec spec; private Instant target; private Instant outputTimestamp; +private Instant elementInputTimestamp; Review comment: @reuvenlax done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 380445) Time Spent: 21h 10m (was: 21h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 21h 10m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=380419=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380419 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 02/Feb/20 03:33 Start Date: 02/Feb/20 03:33 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373816566 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ## @@ -1092,24 +1107,39 @@ private void verifyAbsoluteTimeDomain() { * */ private void setAndVerifyOutputTimestamp() { - // Output timestamp is currently not supported in processing time timers. - if (outputTimestamp != null && !TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { -throw new IllegalStateException("Cannot set outputTimestamp in processing time domain."); - } + // Output timestamp is set to the delivery time if not initialized by an user. - if (outputTimestamp == null) { + if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { outputTimestamp = target; } - - if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { -Instant windowExpiry = window.maxTimestamp().plus(allowedLateness); -checkArgument( -!target.isAfter(windowExpiry), -"Attempted to set event time timer that outputs for %s but that is" -+ " after the expiration of window %s", -target, -windowExpiry); + // For processing timers + if (outputTimestamp == null) { +// For processing timers output timestamp will be: +// 1) timestamp of input element +// OR +// 2) output timestamp of firing timer. +outputTimestamp = elementInputTimestamp; } + + checkArgument( + !outputTimestamp.isBefore(elementInputTimestamp), + "output timestamp %s should be after input message timestamp or output timestamp of firing timers %s", + outputTimestamp, + elementInputTimestamp); Review comment: check this before setting outputTimestamp above (and only if outputTimestamp != null) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 380419) Time Spent: 21h (was: 20h 50m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 21h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=380418=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380418 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 02/Feb/20 03:33 Start Date: 02/Feb/20 03:33 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373810764 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ## @@ -1092,24 +1107,39 @@ private void verifyAbsoluteTimeDomain() { * */ private void setAndVerifyOutputTimestamp() { - // Output timestamp is currently not supported in processing time timers. - if (outputTimestamp != null && !TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { -throw new IllegalStateException("Cannot set outputTimestamp in processing time domain."); - } + // Output timestamp is set to the delivery time if not initialized by an user. - if (outputTimestamp == null) { + if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { outputTimestamp = target; } - - if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { -Instant windowExpiry = window.maxTimestamp().plus(allowedLateness); -checkArgument( -!target.isAfter(windowExpiry), -"Attempted to set event time timer that outputs for %s but that is" -+ " after the expiration of window %s", -target, -windowExpiry); + // For processing timers + if (outputTimestamp == null) { Review comment: are we missing an else clause here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 380418) Time Spent: 20h 50m (was: 20h 40m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 20h 50m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=380417=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380417 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 02/Feb/20 03:33 Start Date: 02/Feb/20 03:33 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373812220 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ## @@ -987,6 +997,7 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { private final TimerSpec spec; private Instant target; private Instant outputTimestamp; +private Instant elementInputTimestamp; Review comment: final This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 380417) Time Spent: 20h 40m (was: 20.5h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 20h 40m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=380068=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380068 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 31/Jan/20 16:03 Start Date: 31/Jan/20 16:03 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373555486 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ## @@ -597,16 +597,16 @@ public synchronized Instant getEarliestTimerTimestamp() { Instant earliest = THE_END_OF_TIME.get(); for (NavigableSet timers : processingTimers.values()) { if (!timers.isEmpty()) { - earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest); Review comment: @reuvenlax added a new function to calculate the earliest outputTimestamp from NavigableSet. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 380068) Time Spent: 20.5h (was: 20h 20m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 20.5h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=380065=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380065 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 31/Jan/20 16:02 Start Date: 31/Jan/20 16:02 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373554620 ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ## @@ -3806,6 +3807,90 @@ public void onTimer( pipeline.run(); } +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesTimersInParDo.class, + UsesTestStreamWithProcessingTime.class, + UsesTestStreamWithOutputTimestamp.class +}) +public void testOutputTimestampWithProcessingTime() { + final String timerId = "foo"; + DoFn, KV> fn1 = + new DoFn, KV>() { + +@TimerId(timerId) +private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + +@ProcessElement +public void processElement( +@TimerId(timerId) Timer timer, +@Timestamp Instant timestamp, +OutputReceiver> o) { + timer + .withOutputTimestamp(timestamp.plus(Duration.standardSeconds(5))) + .offset(Duration.standardSeconds(10)) + .setRelative(); + // Output a message. This will cause the next DoFn to set a timer as well. + o.output(KV.of("foo", 100)); +} + +@OnTimer(timerId) +public void onTimer(OnTimerContext c, BoundedWindow w) {} + }; + + DoFn, Integer> fn2 = + new DoFn, Integer>() { + +@TimerId(timerId) +private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); Review comment: @reuvenlax updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 380065) Time Spent: 20h 10m (was: 20h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 20h 10m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=380066=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380066 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 31/Jan/20 16:02 Start Date: 31/Jan/20 16:02 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373554887 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ## @@ -676,7 +676,9 @@ private synchronized void updateTimers(TimerUpdate update) { Map, List> firedTimers; switch (domain) { case PROCESSING_TIME: - firedTimers = extractFiredTimers(firingTime, processingTimers); + firedTimers = + extractFiredTimers( + INSTANT_ORDERING.min(firingTime, earliestHold.get()), processingTimers); Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 380066) Time Spent: 20h 20m (was: 20h 10m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 20h 20m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=380064=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380064 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 31/Jan/20 16:01 Start Date: 31/Jan/20 16:01 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373554043 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ## @@ -1092,14 +1107,25 @@ private void verifyAbsoluteTimeDomain() { * */ private void setAndVerifyOutputTimestamp() { - // Output timestamp is currently not supported in processing time timers. - if (outputTimestamp != null && !TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { -throw new IllegalStateException("Cannot set outputTimestamp in processing time domain."); - } + // Output timestamp is set to the delivery time if not initialized by an user. - if (outputTimestamp == null) { + if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { outputTimestamp = target; } + // For processing timers + if (outputTimestamp == null) { +// For processing timers output timestamp will be: +// 1) timestamp of input element +// OR +// 2) output timestamp of firing timer. +outputTimestamp = elementInputTimestamp; + } + + checkArgument( + !outputTimestamp.isBefore(elementInputTimestamp), + "output timestamp %s should be after input message timestamp or output timestamp of firing timers %s", + outputTimestamp, + elementInputTimestamp); if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { Review comment: @reuvenlax done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 380064) Time Spent: 20h (was: 19h 50m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 20h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery.
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=380061=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380061 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 31/Jan/20 16:00 Start Date: 31/Jan/20 16:00 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373553668 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ## @@ -930,7 +935,7 @@ public Timer timer(String timerId) { try { TimerSpec spec = (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn); return new TimerInternalsTimer( -window, getNamespace(), timerId, spec, stepContext.timerInternals()); +window, getNamespace(), timerId, spec, fireTimestamp(), stepContext.timerInternals()); Review comment: @reuvenlax done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 380061) Time Spent: 19h 40m (was: 19.5h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 19h 40m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=380062=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380062 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 31/Jan/20 16:00 Start Date: 31/Jan/20 16:00 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373553728 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ## @@ -942,7 +947,12 @@ public TimerMap timerFamily(String timerFamilyId) { TimerSpec spec = (TimerSpec) signature.timerFamilyDeclarations().get(timerFamilyId).field().get(fn); return new TimerInternalsTimerMap( -timerFamilyId, window(), getNamespace(), spec, stepContext.timerInternals()); +timerFamilyId, +window(), +getNamespace(), +spec, +fireTimestamp(), Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 380062) Time Spent: 19h 50m (was: 19h 40m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 19h 50m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=379978=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379978 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 31/Jan/20 13:09 Start Date: 31/Jan/20 13:09 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-580726636 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379978) Time Spent: 19h (was: 18h 50m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 19h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=379981=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379981 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 31/Jan/20 13:09 Start Date: 31/Jan/20 13:09 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-580726285 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379981) Time Spent: 19.5h (was: 19h 20m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 19.5h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=379979=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379979 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 31/Jan/20 13:09 Start Date: 31/Jan/20 13:09 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-580726678 Run Direct ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379979) Time Spent: 19h 10m (was: 19h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 19h 10m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=379980=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379980 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 31/Jan/20 13:09 Start Date: 31/Jan/20 13:09 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-580726236 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379980) Time Spent: 19h 20m (was: 19h 10m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 19h 20m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=379977=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379977 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 31/Jan/20 13:08 Start Date: 31/Jan/20 13:08 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-580726285 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379977) Time Spent: 18h 50m (was: 18h 40m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 18h 50m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=379976=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379976 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 31/Jan/20 13:07 Start Date: 31/Jan/20 13:07 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-580726236 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379976) Time Spent: 18h 40m (was: 18.5h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 18h 40m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=379582=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379582 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 30/Jan/20 20:25 Start Date: 30/Jan/20 20:25 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373171076 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ## @@ -1092,14 +1107,25 @@ private void verifyAbsoluteTimeDomain() { * */ private void setAndVerifyOutputTimestamp() { - // Output timestamp is currently not supported in processing time timers. - if (outputTimestamp != null && !TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { -throw new IllegalStateException("Cannot set outputTimestamp in processing time domain."); - } + // Output timestamp is set to the delivery time if not initialized by an user. - if (outputTimestamp == null) { + if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { outputTimestamp = target; } + // For processing timers + if (outputTimestamp == null) { +// For processing timers output timestamp will be: +// 1) timestamp of input element +// OR +// 2) output timestamp of firing timer. +outputTimestamp = elementInputTimestamp; + } + + checkArgument( + !outputTimestamp.isBefore(elementInputTimestamp), + "output timestamp %s should be after input message timestamp or output timestamp of firing timers %s", + outputTimestamp, + elementInputTimestamp); if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { Review comment: check outputTimestamp is in the window for processing-timer as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379582) Time Spent: 18h 10m (was: 18h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 18h 10m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=379586=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379586 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 30/Jan/20 20:25 Start Date: 30/Jan/20 20:25 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373175244 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ## @@ -597,16 +597,16 @@ public synchronized Instant getEarliestTimerTimestamp() { Instant earliest = THE_END_OF_TIME.get(); for (NavigableSet timers : processingTimers.values()) { if (!timers.isEmpty()) { - earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest); Review comment: I wonder if this is sufficient. The NavigableSet is ordered by timer firing time, not by outputTimestamp. This means that the logic to just look at the first timer (which was correct before we had holds) is probably no longer enough. we probably need to iterate over the set (or we need to have a second data structure to hold the watermark holds and take the minimum of that data structure). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379586) Time Spent: 18.5h (was: 18h 20m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 18.5h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=379584=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379584 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 30/Jan/20 20:25 Start Date: 30/Jan/20 20:25 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373174829 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ## @@ -676,7 +676,9 @@ private synchronized void updateTimers(TimerUpdate update) { Map, List> firedTimers; switch (domain) { case PROCESSING_TIME: - firedTimers = extractFiredTimers(firingTime, processingTimers); + firedTimers = + extractFiredTimers( + INSTANT_ORDERING.min(firingTime, earliestHold.get()), processingTimers); Review comment: I don't think this is right. We want to fire timers based on firing time, not based on the hold. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379584) Time Spent: 18.5h (was: 18h 20m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 18.5h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=379585=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379585 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 30/Jan/20 20:25 Start Date: 30/Jan/20 20:25 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373172001 ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ## @@ -3806,6 +3807,90 @@ public void onTimer( pipeline.run(); } +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesTimersInParDo.class, + UsesTestStreamWithProcessingTime.class, + UsesTestStreamWithOutputTimestamp.class +}) +public void testOutputTimestampWithProcessingTime() { + final String timerId = "foo"; + DoFn, KV> fn1 = + new DoFn, KV>() { + +@TimerId(timerId) +private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + +@ProcessElement +public void processElement( +@TimerId(timerId) Timer timer, +@Timestamp Instant timestamp, +OutputReceiver> o) { + timer + .withOutputTimestamp(timestamp.plus(Duration.standardSeconds(5))) + .offset(Duration.standardSeconds(10)) + .setRelative(); + // Output a message. This will cause the next DoFn to set a timer as well. + o.output(KV.of("foo", 100)); +} + +@OnTimer(timerId) +public void onTimer(OnTimerContext c, BoundedWindow w) {} + }; + + DoFn, Integer> fn2 = + new DoFn, Integer>() { + +@TimerId(timerId) +private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); Review comment: I think you want the second DoFn to set an event-time timer. That way you can test that the watermark hold in the first DoFn prevents firing of the second DoFn's timer, which it will only do if the second DoFn's timer is even time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379585) Time Spent: 18.5h (was: 18h 20m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 18.5h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=379583=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379583 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 30/Jan/20 20:25 Start Date: 30/Jan/20 20:25 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373170187 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ## @@ -930,7 +935,7 @@ public Timer timer(String timerId) { try { TimerSpec spec = (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn); return new TimerInternalsTimer( -window, getNamespace(), timerId, spec, stepContext.timerInternals()); +window, getNamespace(), timerId, spec, fireTimestamp(), stepContext.timerInternals()); Review comment: I think this should be timestamp(), not fireTimestamp(). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379583) Time Spent: 18h 20m (was: 18h 10m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 18h 20m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=379581=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379581 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 30/Jan/20 20:25 Start Date: 30/Jan/20 20:25 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373170316 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ## @@ -942,7 +947,12 @@ public TimerMap timerFamily(String timerFamilyId) { TimerSpec spec = (TimerSpec) signature.timerFamilyDeclarations().get(timerFamilyId).field().get(fn); return new TimerInternalsTimerMap( -timerFamilyId, window(), getNamespace(), spec, stepContext.timerInternals()); +timerFamilyId, +window(), +getNamespace(), +spec, +fireTimestamp(), Review comment: ditto - this should be timestamp() not fireTimestamp() This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379581) Time Spent: 18h (was: 17h 50m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 18h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=379522=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379522 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 30/Jan/20 18:20 Start Date: 30/Jan/20 18:20 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373113361 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ## @@ -1092,15 +1099,19 @@ private void verifyAbsoluteTimeDomain() { * */ private void setAndVerifyOutputTimestamp() { - // Output timestamp is currently not supported in processing time timers. - if (outputTimestamp != null && !TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { -throw new IllegalStateException("Cannot set outputTimestamp in processing time domain."); - } + // Output timestamp is set to the delivery time if not initialized by an user. - if (outputTimestamp == null) { + if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { outputTimestamp = target; } + if (TimeDomain.PROCESSING_TIME.equals(spec.getTimeDomain())) { +outputTimestamp = +outputTimestampOffset.equals(Duration.ZERO) +? target +: target.minus(offset.minus(outputTimestampOffset)); + } + if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { Review comment: @reuvenlax done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379522) Time Spent: 17h 40m (was: 17.5h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 17h 40m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=379523=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379523 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 30/Jan/20 18:20 Start Date: 30/Jan/20 18:20 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373113447 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ## @@ -1092,15 +1099,19 @@ private void verifyAbsoluteTimeDomain() { * */ private void setAndVerifyOutputTimestamp() { - // Output timestamp is currently not supported in processing time timers. - if (outputTimestamp != null && !TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { -throw new IllegalStateException("Cannot set outputTimestamp in processing time domain."); - } + // Output timestamp is set to the delivery time if not initialized by an user. - if (outputTimestamp == null) { + if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { outputTimestamp = target; } Review comment: @reuvenlax done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379523) Time Spent: 17h 50m (was: 17h 40m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 17h 50m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=379505=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379505 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 30/Jan/20 17:54 Start Date: 30/Jan/20 17:54 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373100681 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ## @@ -1092,15 +1099,19 @@ private void verifyAbsoluteTimeDomain() { * */ private void setAndVerifyOutputTimestamp() { - // Output timestamp is currently not supported in processing time timers. - if (outputTimestamp != null && !TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { -throw new IllegalStateException("Cannot set outputTimestamp in processing time domain."); - } + // Output timestamp is set to the delivery time if not initialized by an user. - if (outputTimestamp == null) { + if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { outputTimestamp = target; } + if (TimeDomain.PROCESSING_TIME.equals(spec.getTimeDomain())) { +outputTimestamp = +outputTimestampOffset.equals(Duration.ZERO) +? target +: target.minus(offset.minus(outputTimestampOffset)); + } + if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { Review comment: @reuvenlax , does that mean we need to compare output timestamp with timerInternal.currentInputWatermarkTime()? I think it will fail our previous test case of output timestamp with event timer as we have set output timestamp as 5 while having a timestamp of input element as 9. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379505) Time Spent: 17.5h (was: 17h 20m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 17.5h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=379504=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379504 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 30/Jan/20 17:53 Start Date: 30/Jan/20 17:53 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373100726 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ## @@ -1074,6 +1075,12 @@ public Timer withOutputTimestamp(Instant outputTimestamp) { return this; } +@Override +public Timer withOutputTimestampOffset(Duration outputTimestampOffset) { + this.outputTimestampOffset = outputTimestampOffset; + return this; +} Review comment: @reuvenlax done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379504) Time Spent: 17h 20m (was: 17h 10m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 17h 20m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=379503=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379503 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 30/Jan/20 17:53 Start Date: 30/Jan/20 17:53 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r373100681 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ## @@ -1092,15 +1099,19 @@ private void verifyAbsoluteTimeDomain() { * */ private void setAndVerifyOutputTimestamp() { - // Output timestamp is currently not supported in processing time timers. - if (outputTimestamp != null && !TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { -throw new IllegalStateException("Cannot set outputTimestamp in processing time domain."); - } + // Output timestamp is set to the delivery time if not initialized by an user. - if (outputTimestamp == null) { + if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { outputTimestamp = target; } + if (TimeDomain.PROCESSING_TIME.equals(spec.getTimeDomain())) { +outputTimestamp = +outputTimestampOffset.equals(Duration.ZERO) +? target +: target.minus(offset.minus(outputTimestampOffset)); + } + if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { Review comment: @reuvenlax , does that mean we need to compare output timestamp with timer.currentInputWatermarkTime()? I think it will fail our previous test case of output timestamp with event timer as we have set output timestamp as 5 while having a timestamp of input element as 9. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379503) Time Spent: 17h 10m (was: 17h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 17h 10m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378999=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378999 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 29/Jan/20 20:23 Start Date: 29/Jan/20 20:23 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r372609060 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ## @@ -1074,6 +1075,12 @@ public Timer withOutputTimestamp(Instant outputTimestamp) { return this; } +@Override +public Timer withOutputTimestampOffset(Duration outputTimestampOffset) { + this.outputTimestampOffset = outputTimestampOffset; + return this; +} Review comment: I'm not sure that we need withOutputTimestampOffset - I think withOutputTimestamp is sufficient. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378999) Time Spent: 16h 40m (was: 16.5h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 16h 40m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=379000=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379000 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 29/Jan/20 20:23 Start Date: 29/Jan/20 20:23 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r372610731 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ## @@ -1092,15 +1099,19 @@ private void verifyAbsoluteTimeDomain() { * */ private void setAndVerifyOutputTimestamp() { - // Output timestamp is currently not supported in processing time timers. - if (outputTimestamp != null && !TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { -throw new IllegalStateException("Cannot set outputTimestamp in processing time domain."); - } + // Output timestamp is set to the delivery time if not initialized by an user. - if (outputTimestamp == null) { + if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { outputTimestamp = target; } + if (TimeDomain.PROCESSING_TIME.equals(spec.getTimeDomain())) { +outputTimestamp = +outputTimestampOffset.equals(Duration.ZERO) +? target +: target.minus(offset.minus(outputTimestampOffset)); + } + if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { Review comment: I think that we should verify that the output timestamp is > the timestamp of the input message (if in processElement) or the output timestamp of the firing timer (if in processTimer). The < check remains correct - even for processing-time timers. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379000) Time Spent: 16h 50m (was: 16h 40m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 16h 50m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=379001=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379001 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 29/Jan/20 20:23 Start Date: 29/Jan/20 20:23 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#discussion_r372611059 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ## @@ -1092,15 +1099,19 @@ private void verifyAbsoluteTimeDomain() { * */ private void setAndVerifyOutputTimestamp() { - // Output timestamp is currently not supported in processing time timers. - if (outputTimestamp != null && !TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { -throw new IllegalStateException("Cannot set outputTimestamp in processing time domain."); - } + // Output timestamp is set to the delivery time if not initialized by an user. - if (outputTimestamp == null) { + if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { outputTimestamp = target; } Review comment: I think that if the timer is processing time, then then default outputTimestamp should be that of the input element (or the output time of the firing timer if in processTimer). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379001) Time Spent: 17h (was: 16h 50m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 17h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378821=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378821 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 29/Jan/20 14:00 Start Date: 29/Jan/20 14:00 Worklog Time Spent: 10m Work Description: mwalenia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-579729353 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378821) Time Spent: 16h 20m (was: 16h 10m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 16h 20m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378822=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378822 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 29/Jan/20 14:00 Start Date: 29/Jan/20 14:00 Worklog Time Spent: 10m Work Description: mwalenia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-579735506 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378822) Time Spent: 16.5h (was: 16h 20m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 16.5h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378789=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378789 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 29/Jan/20 12:52 Start Date: 29/Jan/20 12:52 Worklog Time Spent: 10m Work Description: mwalenia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-579741919 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378789) Time Spent: 16h 10m (was: 16h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 16h 10m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378764=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378764 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 29/Jan/20 12:15 Start Date: 29/Jan/20 12:15 Worklog Time Spent: 10m Work Description: mwalenia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-579729353 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378764) Time Spent: 15h 40m (was: 15.5h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 15h 40m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378766=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378766 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 29/Jan/20 12:15 Start Date: 29/Jan/20 12:15 Worklog Time Spent: 10m Work Description: mwalenia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-579728737 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378766) Time Spent: 16h (was: 15h 50m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 16h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378765=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378765 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 29/Jan/20 12:15 Start Date: 29/Jan/20 12:15 Worklog Time Spent: 10m Work Description: mwalenia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-579728185 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378765) Time Spent: 15h 50m (was: 15h 40m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 15h 50m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378763=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378763 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 29/Jan/20 12:13 Start Date: 29/Jan/20 12:13 Worklog Time Spent: 10m Work Description: mwalenia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-579728737 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378763) Time Spent: 15.5h (was: 15h 20m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 15.5h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378758=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378758 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 29/Jan/20 12:12 Start Date: 29/Jan/20 12:12 Worklog Time Spent: 10m Work Description: mwalenia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-579727913 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378758) Time Spent: 15h 20m (was: 15h 10m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 15h 20m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378757=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378757 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 29/Jan/20 12:11 Start Date: 29/Jan/20 12:11 Worklog Time Spent: 10m Work Description: mwalenia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-579728185 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378757) Time Spent: 15h 10m (was: 15h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 15h 10m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378754=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378754 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 29/Jan/20 12:11 Start Date: 29/Jan/20 12:11 Worklog Time Spent: 10m Work Description: mwalenia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-579727444 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378754) Time Spent: 14h 50m (was: 14h 40m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 14h 50m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378755=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378755 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 29/Jan/20 12:11 Start Date: 29/Jan/20 12:11 Worklog Time Spent: 10m Work Description: mwalenia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-579727705 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378755) Time Spent: 15h (was: 14h 50m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 15h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378750=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378750 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 29/Jan/20 12:10 Start Date: 29/Jan/20 12:10 Worklog Time Spent: 10m Work Description: mwalenia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-579727705 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378750) Time Spent: 14.5h (was: 14h 20m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 14.5h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378753=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378753 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 29/Jan/20 12:10 Start Date: 29/Jan/20 12:10 Worklog Time Spent: 10m Work Description: mwalenia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-579727913 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378753) Time Spent: 14h 40m (was: 14.5h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 14h 40m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378749=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378749 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 29/Jan/20 12:09 Start Date: 29/Jan/20 12:09 Worklog Time Spent: 10m Work Description: mwalenia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-579727444 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378749) Time Spent: 14h 20m (was: 14h 10m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 14h 20m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378387=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378387 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 28/Jan/20 18:20 Start Date: 28/Jan/20 18:20 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-579386009 @reuvenlax really needs your input with the failed test cases as the functionality is completed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378387) Time Spent: 14h (was: 13h 50m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 14h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378388=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378388 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 28/Jan/20 18:20 Start Date: 28/Jan/20 18:20 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-579386009 @reuvenlax really need your input with the failed test cases as the functionality is completed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378388) Time Spent: 14h 10m (was: 14h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 14h 10m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378228=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378228 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 28/Jan/20 13:28 Start Date: 28/Jan/20 13:28 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-579245093 Run Direct ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378228) Time Spent: 13h 50m (was: 13h 40m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 13h 50m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378225=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378225 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 28/Jan/20 13:26 Start Date: 28/Jan/20 13:26 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-579244488 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378225) Time Spent: 13.5h (was: 13h 20m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 13.5h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378227=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378227 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 28/Jan/20 13:26 Start Date: 28/Jan/20 13:26 Worklog Time Spent: 10m Work Description: mwalenia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-579232967 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378227) Time Spent: 13h 40m (was: 13.5h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 13h 40m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378206=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378206 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 28/Jan/20 12:57 Start Date: 28/Jan/20 12:57 Worklog Time Spent: 10m Work Description: mwalenia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-579232716 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378206) Time Spent: 13h (was: 12h 50m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 13h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378207=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378207 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 28/Jan/20 12:57 Start Date: 28/Jan/20 12:57 Worklog Time Spent: 10m Work Description: mwalenia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-579232716 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378207) Time Spent: 13h 10m (was: 13h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 13h 10m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=378208=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378208 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 28/Jan/20 12:57 Start Date: 28/Jan/20 12:57 Worklog Time Spent: 10m Work Description: mwalenia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-579232967 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378208) Time Spent: 13h 20m (was: 13h 10m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 13h 20m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=377742=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-377742 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 27/Jan/20 17:16 Start Date: 27/Jan/20 17:16 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-578854223 @reuvenlax I have completed outputTimestamp functionality. Need your input with the failed test cases. I think it is expected with the addition of this feature. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 377742) Time Spent: 12h 50m (was: 12h 40m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 12h 50m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=377616=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-377616 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 27/Jan/20 13:17 Start Date: 27/Jan/20 13:17 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-578741562 Run Direct ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 377616) Time Spent: 12h 40m (was: 12.5h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 12h 40m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=377614=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-377614 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 27/Jan/20 13:17 Start Date: 27/Jan/20 13:17 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-578741562 Run Direct ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 377614) Time Spent: 12h 20m (was: 12h 10m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 12h 20m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=377613=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-377613 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 27/Jan/20 13:17 Start Date: 27/Jan/20 13:17 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-578741413 Run Direct ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 377613) Time Spent: 12h 10m (was: 12h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 12h 10m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=377615=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-377615 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 27/Jan/20 13:17 Start Date: 27/Jan/20 13:17 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-578741413 Run Direct ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 377615) Time Spent: 12.5h (was: 12h 20m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 12.5h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=377612=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-377612 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 27/Jan/20 13:16 Start Date: 27/Jan/20 13:16 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-578740931 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 377612) Time Spent: 12h (was: 11h 50m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 12h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=377608=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-377608 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 27/Jan/20 13:15 Start Date: 27/Jan/20 13:15 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-578740729 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 377608) Time Spent: 11h 20m (was: 11h 10m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 11h 20m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=377610=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-377610 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 27/Jan/20 13:15 Start Date: 27/Jan/20 13:15 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-578740684 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 377610) Time Spent: 11h 40m (was: 11.5h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 11h 40m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=377609=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-377609 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 27/Jan/20 13:15 Start Date: 27/Jan/20 13:15 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-578740931 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 377609) Time Spent: 11.5h (was: 11h 20m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 11.5h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=377607=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-377607 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 27/Jan/20 13:15 Start Date: 27/Jan/20 13:15 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-578740684 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 377607) Time Spent: 11h 10m (was: 11h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 11h 10m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=377611=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-377611 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 27/Jan/20 13:15 Start Date: 27/Jan/20 13:15 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-578740729 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 377611) Time Spent: 11h 50m (was: 11h 40m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 11h 50m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=377345=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-377345 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 26/Jan/20 08:04 Start Date: 26/Jan/20 08:04 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-578478056 Pinging to trigger tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 377345) Time Spent: 11h (was: 10h 50m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 11h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=377344=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-377344 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 26/Jan/20 08:04 Start Date: 26/Jan/20 08:04 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-578478056 Pinging to trigger tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 377344) Time Spent: 10h 50m (was: 10h 40m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 10h 50m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=376400=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-376400 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 23/Jan/20 18:09 Start Date: 23/Jan/20 18:09 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-577801621 Run JavaPortabilityApi PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 376400) Time Spent: 10h 40m (was: 10.5h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 10h 40m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=376398=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-376398 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 23/Jan/20 18:09 Start Date: 23/Jan/20 18:09 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-577801568 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 376398) Time Spent: 10.5h (was: 10h 20m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 10.5h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=376365=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-376365 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 23/Jan/20 17:36 Start Date: 23/Jan/20 17:36 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-577782573 Run JavaPortabilityApi PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 376365) Time Spent: 10h 20m (was: 10h 10m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 10h 20m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=376349=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-376349 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 23/Jan/20 17:21 Start Date: 23/Jan/20 17:21 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-577782573 Run JavaPortabilityApi PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 376349) Time Spent: 10h 10m (was: 10h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 10h 10m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=376338=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-376338 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 23/Jan/20 17:10 Start Date: 23/Jan/20 17:10 Worklog Time Spent: 10m Work Description: aaltay commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-57943 Pinging to trigger tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 376338) Time Spent: 10h (was: 9h 50m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 10h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=374686=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-374686 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 20/Jan/20 18:17 Start Date: 20/Jan/20 18:17 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on issue #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627#issuecomment-576384782 R: @reuvenlax This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 374686) Time Spent: 9h 50m (was: 9h 40m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 9h 50m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=373630=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-373630 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 17/Jan/20 14:31 Start Date: 17/Jan/20 14:31 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on pull request #10627: [BEAM-2535] Support outputTimestamp and watermark holds in processing timers. URL: https://github.com/apache/beam/pull/10627 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=364248=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-364248 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 28/Dec/19 16:21 Start Date: 28/Dec/19 16:21 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #10422: [BEAM-2535] TimerData signature update URL: https://github.com/apache/beam/pull/10422 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 364248) Time Spent: 9.5h (was: 9h 20m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 9.5h > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=363939=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-363939 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 27/Dec/19 12:00 Start Date: 27/Dec/19 12:00 Worklog Time Spent: 10m Work Description: rehmanmuradali commented on issue #10422: [BEAM-2535] TimerData signature update URL: https://github.com/apache/beam/pull/10422#issuecomment-569255404 > SimpleDoFnRunner.testTimerSet is failing @reuvenlax seems like the test case was expecting empty string as timerFamilyId because we have changed it earlier. It's done now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 363939) Time Spent: 9h 20m (was: 9h 10m) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 9h 20m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2535) Allow explicit output time independent of firing specification for all timers
[ https://issues.apache.org/jira/browse/BEAM-2535?focusedWorklogId=363914=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-363914 ] ASF GitHub Bot logged work on BEAM-2535: Author: ASF GitHub Bot Created on: 27/Dec/19 09:41 Start Date: 27/Dec/19 09:41 Worklog Time Spent: 10m Work Description: reuvenlax commented on issue #10422: [BEAM-2535] TimerData signature update URL: https://github.com/apache/beam/pull/10422#issuecomment-56921 run dataflow validatesrunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 363914) Time Spent: 9h 10m (was: 9h) > Allow explicit output time independent of firing specification for all timers > - > > Key: BEAM-2535 > URL: https://issues.apache.org/jira/browse/BEAM-2535 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Shehzaad Nakhoda >Priority: Major > Time Spent: 9h 10m > Remaining Estimate: 0h > > Today, we have insufficient control over the event time timestamp of elements > output from a timer callback. > 1. For an event time timer, it is the timestamp of the timer itself. > 2. For a processing time timer, it is the current input watermark at the > time of processing. > But for both of these, we may want to reserve the right to output a > particular time, aka set a "watermark hold". > A naive implementation of a {{TimerWithWatermarkHold}} would work for making > sure output is not droppable, but does not fully explain window expiration > and late data/timer dropping. > In the natural interpretation of a timer as a feedback loop on a transform, > timers should be viewed as another channel of input, with a watermark, and > items on that channel _all need event time timestamps even if they are > delivered according to a different time domain_. > I propose that the specification for when a timer should fire should be > separated (with nice defaults) from the specification of the event time of > resulting outputs. These timestamps will determine a side channel with a new > "timer watermark" that constrains the output watermark. > - We still need to fire event time timers according to the input watermark, > so that event time timers fire. > - Late data dropping and window expiration will be in terms of the minimum > of the input watermark and the timer watermark. In this way, whenever a timer > is set, the window is not going to be garbage collected. > - We will need to make sure we have a way to "wake up" a window once it is > expired; this may be as simple as exhausting the timer channel as soon as the > input watermark indicates expiration of a window > This is mostly aimed at end-user timers in a stateful+timely {{DoFn}}. It > seems reasonable to use timers as an implementation detail (e.g. in > runners-core utilities) without wanting any of this additional machinery. For > example, if there is no possibility of output from the timer callback. -- This message was sent by Atlassian Jira (v8.3.4#803005)