This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new fa01615e520 Merge pull request #17359: [BEAM-14303] Add a way to exclude output timestamp watermark holds fa01615e520 is described below commit fa01615e5207b53a7f73ff6a5ff55c336717c88f Author: Reuven Lax <re...@google.com> AuthorDate: Thu May 5 14:23:48 2022 -0700 Merge pull request #17359: [BEAM-14303] Add a way to exclude output timestamp watermark holds --- .../apache/beam/runners/core/SimpleDoFnRunner.java | 170 +++++++++------------ .../beam/runners/core/SimpleDoFnRunnerTest.java | 10 +- .../dataflow/worker/WindmillTimerInternals.java | 10 +- .../main/java/org/apache/beam/sdk/state/Timer.java | 6 + .../apache/beam/sdk/transforms/Deduplicate.java | 8 +- .../org/apache/beam/sdk/transforms/ParDoTest.java | 50 ++++++ .../apache/beam/fn/harness/FnApiDoFnRunner.java | 65 +++++--- .../bigquery/StorageApiWritesShardedRecords.java | 11 +- 8 files changed, 186 insertions(+), 144 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index a73dd521f86..a7d67bf7526 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -246,6 +246,30 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out return sideInputReader.get(view, sideInputWindow); } + @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, but must be respected + private void checkTimestamp(Instant elemTimestamp, Instant timestamp) { + Instant lowerBound; + try { + lowerBound = elemTimestamp.minus(fn.getAllowedTimestampSkew()); + } catch (ArithmeticException e) { + lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE; + } + if (timestamp.isBefore(lowerBound) || timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) { + throw new IllegalArgumentException( + String.format( + "Cannot output with timestamp %s. Output timestamps must be no earlier than the " + + "timestamp of the current input or timer (%s) minus the allowed skew (%s) and no " + + "later than %s. See the DoFn#getAllowedTimestampSkew() Javadoc for details " + + "on changing the allowed skew.", + timestamp, + elemTimestamp, + fn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE + ? fn.getAllowedTimestampSkew() + : PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()), + BoundedWindow.TIMESTAMP_MAX_VALUE)); + } + } + private <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) { checkArgument(outputTags.contains(tag), "Unknown output tag %s", tag); outputManager.output(tag, windowedElem); @@ -389,7 +413,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out @Override public void outputWithTimestamp(OutputT output, Instant timestamp) { - checkTimestamp(timestamp); + checkTimestamp(elem.getTimestamp(), timestamp); outputWithTimestamp(mainOutputTag, output, timestamp); } @@ -402,7 +426,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out @Override public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null"); - checkTimestamp(timestamp); + checkTimestamp(elem.getTimestamp(), timestamp); outputWindowedValue( tag, WindowedValue.of(output, timestamp, elem.getWindows(), elem.getPane())); } @@ -416,30 +440,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out return elem.getWindows(); } - @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, but must be respected - private void checkTimestamp(Instant timestamp) { - Instant lowerBound; - try { - lowerBound = elem.getTimestamp().minus(fn.getAllowedTimestampSkew()); - } catch (ArithmeticException e) { - lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE; - } - if (timestamp.isBefore(lowerBound) || timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) { - throw new IllegalArgumentException( - String.format( - "Cannot output with timestamp %s. Output timestamps must be no earlier than the " - + "timestamp of the current input (%s) minus the allowed skew (%s) and no " - + "later than %s. See the DoFn#getAllowedTimestampSkew() Javadoc for details " - + "on changing the allowed skew.", - timestamp, - elem.getTimestamp(), - fn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE - ? fn.getAllowedTimestampSkew() - : PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()), - BoundedWindow.TIMESTAMP_MAX_VALUE)); - } - } - @Override public BoundedWindow window() { return Iterables.getOnlyElement(elem.getWindows()); @@ -834,18 +834,19 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out @Override public void outputWithTimestamp(OutputT output, Instant timestamp) { - checkTimestamp(timestamp); + checkTimestamp(timestamp(), timestamp); outputWithTimestamp(mainOutputTag, output, timestamp); } @Override public <T> void output(TupleTag<T> tag, T output) { + checkTimestamp(timestamp(), timestamp); outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING)); } @Override public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - checkTimestamp(timestamp); + checkTimestamp(timestamp(), timestamp); outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING)); } @@ -854,30 +855,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out throw new UnsupportedOperationException( "Bundle finalization is not supported in non-portable pipelines."); } - - @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, but must be respected - private void checkTimestamp(Instant timestamp) { - Instant lowerBound; - try { - lowerBound = timestamp().minus(fn.getAllowedTimestampSkew()); - } catch (ArithmeticException e) { - lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE; - } - if (timestamp.isBefore(lowerBound) || timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) { - throw new IllegalArgumentException( - String.format( - "Cannot output with timestamp %s. Output timestamps must be no earlier than the " - + "output timestamp of the timer (%s) minus the allowed skew (%s) and no " - + "later than %s. See the DoFn#getAllowedTimestampSkew() Javadoc for details " - + "on changing the allowed skew.", - timestamp, - timestamp(), - fn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE - ? fn.getAllowedTimestampSkew() - : PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()), - BoundedWindow.TIMESTAMP_MAX_VALUE)); - } - } } /** @@ -1064,17 +1041,19 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out @Override public void outputWithTimestamp(OutputT output, Instant timestamp) { + checkTimestamp(this.timestamp, timestamp); outputWithTimestamp(mainOutputTag, output, timestamp); } @Override public <T> void output(TupleTag<T> tag, T output) { + checkTimestamp(this.timestamp, timestamp); outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING)); } @Override public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - checkTimestamp(timestamp); + checkTimestamp(this.timestamp, timestamp); outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING)); } @@ -1083,30 +1062,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out throw new UnsupportedOperationException( "Bundle finalization is not supported in non-portable pipelines."); } - - @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, but must be respected - private void checkTimestamp(Instant timestamp) { - Instant lowerBound; - try { - lowerBound = this.timestamp.minus(fn.getAllowedTimestampSkew()); - } catch (ArithmeticException e) { - lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE; - } - if (timestamp.isBefore(lowerBound) || timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) { - throw new IllegalArgumentException( - String.format( - "Cannot output with timestamp %s. Output timestamps must be no earlier than the " - + "output timestamp of the window (%s) minus the allowed skew (%s) and no " - + "later than %s. See the DoFn#getAllowedTimestampSkew() Javadoc for details " - + "on changing the allowed skew.", - timestamp, - this.timestamp, - fn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE - ? fn.getAllowedTimestampSkew() - : PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()), - BoundedWindow.TIMESTAMP_MAX_VALUE)); - } - } } private class TimerInternalsTimer implements Timer { @@ -1121,7 +1076,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out private final String timerFamilyId; private final TimerSpec spec; private Instant target; - private Instant outputTimestamp; + private @Nullable Instant outputTimestamp; + private boolean noOutputTimestamp; private final Instant elementInputTimestamp; private Duration period = Duration.ZERO; private Duration offset = Duration.ZERO; @@ -1138,6 +1094,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out this.timerId = timerId; this.timerFamilyId = ""; this.spec = spec; + this.noOutputTimestamp = false; this.elementInputTimestamp = elementInputTimestamp; this.timerInternals = timerInternals; } @@ -1216,6 +1173,14 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out @Override public Timer withOutputTimestamp(Instant outputTimestamp) { this.outputTimestamp = outputTimestamp; + this.noOutputTimestamp = false; + return this; + } + + @Override + public Timer withNoOutputTimestamp() { + this.outputTimestamp = null; + this.noOutputTimestamp = true; return this; } @@ -1251,38 +1216,41 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out : PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()), BoundedWindow.TIMESTAMP_MAX_VALUE)); } - } else if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { + } else if (!noOutputTimestamp && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { // The outputTimestamp was unset and this is a timer in the EVENT_TIME domain. The output // timestamp will be the firing timestamp. outputTimestamp = target; - } else { + } else if (!noOutputTimestamp) { // The outputTimestamp was unset and this is a timer in the PROCESSING_TIME // (or SYNCHRONIZED_PROCESSING_TIME) domain. The output timestamp will be the timestamp of // the element (or timer) setting this timer. outputTimestamp = elementInputTimestamp; } - - Instant windowExpiry = LateDataUtils.garbageCollectionTime(window, allowedLateness); - if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { - checkArgument( - !outputTimestamp.isAfter(windowExpiry), - "Attempted to set an event-time timer with an output timestamp of %s that is" - + " after the expiration of window %s", - outputTimestamp, - windowExpiry); - checkArgument( - !target.isAfter(windowExpiry), - "Attempted to set an event-time timer with a firing timestamp of %s that is" - + " after the expiration of window %s", - target, - windowExpiry); + if (outputTimestamp != null) { + Instant windowExpiry = LateDataUtils.garbageCollectionTime(window, allowedLateness); + if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { + checkArgument( + !outputTimestamp.isAfter(windowExpiry), + "Attempted to set an event-time timer with an output timestamp of %s that is" + + " after the expiration of window %s", + outputTimestamp, + windowExpiry); + checkArgument( + !target.isAfter(windowExpiry), + "Attempted to set an event-time timer with a firing timestamp of %s that is" + + " after the expiration of window %s", + target, + windowExpiry); + } else { + checkArgument( + !outputTimestamp.isAfter(windowExpiry), + "Attempted to set a processing-time timer with an output timestamp of %s that is" + + " after the expiration of window %s", + outputTimestamp, + windowExpiry); + } } else { - checkArgument( - !outputTimestamp.isAfter(windowExpiry), - "Attempted to set a processing-time timer with an output timestamp of %s that is" - + " after the expiration of window %s", - outputTimestamp, - windowExpiry); + outputTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.millis(1)); } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java index 0cb53ed7b2f..51c6e1d83cf 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java @@ -318,7 +318,8 @@ public class SimpleDoFnRunnerTest { allOf( containsString("must be no earlier"), containsString( - String.format("timestamp of the current input (%s)", new Instant(0).toString())), + String.format( + "timestamp of the current input or timer (%s)", new Instant(0).toString())), containsString( String.format( "the allowed skew (%s)", @@ -369,7 +370,8 @@ public class SimpleDoFnRunnerTest { allOf( containsString("must be no earlier"), containsString( - String.format("timestamp of the current input (%s)", new Instant(0).toString())), + String.format( + "timestamp of the current input or timer (%s)", new Instant(0).toString())), containsString( String.format( "the allowed skew (%s)", @@ -626,7 +628,9 @@ public class SimpleDoFnRunnerTest { exception.getMessage(), allOf( containsString("must be no earlier"), - containsString(String.format("timestamp of the timer (%s)", new Instant(0).toString())), + containsString( + String.format( + "timestamp of the current input or timer (%s)", new Instant(0).toString())), containsString( String.format( "the allowed skew (%s)", diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java index c732bdb1060..b0eb8674d99 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java @@ -31,6 +31,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.ExposedByteArrayInputStream; import org.apache.beam.sdk.util.ExposedByteArrayOutputStream; import org.apache.beam.sdk.util.VarInt; @@ -40,6 +41,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBase import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table.Cell; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; import org.joda.time.Instant; /** @@ -207,7 +209,11 @@ class WindmillTimerInternals implements TimerInternals { if (cell.getValue()) { // Setting the timer. If it is a user timer, set a hold. - if (needsWatermarkHold(timerData)) { + // Only set a hold if it's needed and if the hold is before the end of the global window. + if (needsWatermarkHold(timerData) + && timerData + .getOutputTimestamp() + .isBefore(GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1)))) { // Setting a timer, clear any prior hold and set to the new value outputBuilder .addWatermarkHoldsBuilder() @@ -220,6 +226,8 @@ class WindmillTimerInternals implements TimerInternals { } else { // Deleting a timer. If it is a user timer, clear the hold timer.clearTimestamp(); + // Clear the hold even if it's the end of the global window in order to maintain update + // compatibility. if (needsWatermarkHold(timerData)) { // We are deleting timer; clear the hold outputBuilder diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java index 78453ee7c1b..efaf0154ef3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java @@ -99,6 +99,12 @@ public interface Timer { */ Timer withOutputTimestamp(Instant outputTime); + /** + * Asserts that there is no output timestamp. The output watermark will not be held up, and it is + * illegal to output messages from this timer using the default output timestamp. + */ + Timer withNoOutputTimestamp(); + /** * Returns the current relative time used by {@link #setRelative()} and {@link #offset}. This can * be used by a client that self-manages relative timers (e.g. one that stores the current timer diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Deduplicate.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Deduplicate.java index e9fa0e2995f..bbbcb859aef 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Deduplicate.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Deduplicate.java @@ -27,7 +27,6 @@ import org.apache.beam.sdk.state.Timer; import org.apache.beam.sdk.state.TimerSpec; import org.apache.beam.sdk.state.TimerSpecs; import org.apache.beam.sdk.state.ValueState; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; @@ -307,17 +306,14 @@ public final class Deduplicate { @ProcessElement public void processElement( @Element KV<K, V> element, - BoundedWindow window, OutputReceiver<KV<K, V>> receiver, @StateId(SEEN_STATE) ValueState<Boolean> seenState, @TimerId(EXPIRY_TIMER) Timer expiryTimer) { Boolean seen = seenState.read(); // Seen state is either set or not set so if it has been set then it must be true. if (seen == null) { - // We don't want the expiry timer to hold up watermarks, so we set its output timestamp to - // the end of the - // window. - expiryTimer.offset(duration).withOutputTimestamp(window.maxTimestamp()).setRelative(); + // We don't want the expiry timer to hold up watermarks. + expiryTimer.offset(duration).withNoOutputTimestamp().setRelative(); seenState.write(true); receiver.output(element); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 3207295a755..6cc943e1d88 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -4336,6 +4336,56 @@ public class ParDoTest implements Serializable { pipeline.run(); } + @Test + @Category({ValidatesRunner.class, UsesTimersInParDo.class}) + public void testNoOutputTimestampDefaultBounded() throws Exception { + runTestNoOutputTimestampDefault(false); + } + + @Test + @Category({ValidatesRunner.class, UsesTimersInParDo.class}) + public void testNoOutputTimestampDefaultStreaming() throws Exception { + runTestNoOutputTimestampDefault(false); + } + + public void runTestNoOutputTimestampDefault(boolean useStreaming) throws Exception { + final String timerId = "foo"; + DoFn<KV<String, Long>, Long> fn1 = + new DoFn<KV<String, Long>, Long>() { + + @TimerId(timerId) + private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processElement( + @TimerId(timerId) Timer timer, @Timestamp Instant timestamp) { + timer.withNoOutputTimestamp().set(timestamp.plus(Duration.millis(10))); + } + + @OnTimer(timerId) + public void onTimer(@Timestamp Instant timestamp, OutputReceiver<Long> o) { + try { + o.output(timestamp.getMillis()); + fail("Should have failed due to outputting when noOutputTimestamp was set."); + } catch (IllegalArgumentException e) { + System.err.println("EXCEPTION " + e.getMessage() + " stack "); + e.printStackTrace(); + Preconditions.checkState(e.getMessage().contains("Cannot output with timestamp")); + } + } + }; + + if (useStreaming) { + pipeline.getOptions().as(StreamingOptions.class).setStreaming(true); + } + PCollection<Long> output = + pipeline + .apply(Create.timestamped(TimestampedValue.of(KV.of("hello", 1L), new Instant(3)))) + .apply("first", ParDo.of(fn1)); + + pipeline.run(); + } + @Test @Category({ValidatesRunner.class, UsesTimersInParDo.class}) public void testOutOfBoundsEventTimeTimerHold() throws Exception { diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index cf84027a7e9..2d3462eb269 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -1774,7 +1774,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator private final BoundedWindow boundedWindow; private final PaneInfo paneInfo; - private Instant outputTimestamp; + private @Nullable Instant outputTimestamp; + private boolean noOutputTimestamp; private Duration period = Duration.ZERO; private Duration offset = Duration.ZERO; @@ -1793,6 +1794,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp; this.boundedWindow = boundedWindow; this.paneInfo = paneInfo; + this.noOutputTimestamp = false; this.timeDomain = timeDomain; switch (timeDomain) { @@ -1861,6 +1863,14 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator @Override public org.apache.beam.sdk.state.Timer withOutputTimestamp(Instant outputTime) { this.outputTimestamp = outputTime; + this.noOutputTimestamp = false; + return this; + } + + @Override + public org.apache.beam.sdk.state.Timer withNoOutputTimestamp() { + this.outputTimestamp = null; + this.noOutputTimestamp = true; return this; } @@ -1914,40 +1924,45 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator } // Output timestamp is set to the delivery time if not initialized by an user. - if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(timeDomain)) { + if (!noOutputTimestamp + && outputTimestamp == null + && TimeDomain.EVENT_TIME.equals(timeDomain)) { outputTimestamp = scheduledTime; } // For processing timers - if (outputTimestamp == null) { + if (!noOutputTimestamp && outputTimestamp == null) { // For processing timers output timestamp will be: // 1) timestamp of input element // OR // 2) hold timestamp of firing timer. outputTimestamp = elementTimestampOrTimerHoldTimestamp; } - - Instant windowExpiry = LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness); - if (TimeDomain.EVENT_TIME.equals(timeDomain)) { - checkArgument( - !outputTimestamp.isAfter(scheduledTime), - "Attempted to set an event-time timer with an output timestamp of %s that is" - + " after the timer firing timestamp %s", - outputTimestamp, - scheduledTime); - checkArgument( - !scheduledTime.isAfter(windowExpiry), - "Attempted to set an event-time timer with a firing timestamp of %s that is" - + " after the expiration of window %s", - scheduledTime, - windowExpiry); + if (outputTimestamp != null) { + Instant windowExpiry = LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness); + if (TimeDomain.EVENT_TIME.equals(timeDomain)) { + checkArgument( + !outputTimestamp.isAfter(scheduledTime), + "Attempted to set an event-time timer with an output timestamp of %s that is" + + " after the timer firing timestamp %s", + outputTimestamp, + scheduledTime); + checkArgument( + !scheduledTime.isAfter(windowExpiry), + "Attempted to set an event-time timer with a firing timestamp of %s that is" + + " after the expiration of window %s", + scheduledTime, + windowExpiry); + } else { + checkArgument( + !outputTimestamp.isAfter(windowExpiry), + "Attempted to set a processing-time timer with an output timestamp of %s that is" + + " after the expiration of window %s", + outputTimestamp, + windowExpiry); + } } else { - checkArgument( - !outputTimestamp.isAfter(windowExpiry), - "Attempted to set a processing-time timer with an output timestamp of %s that is" - + " after the expiration of window %s", - outputTimestamp, - windowExpiry); + outputTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.millis(1)); } return Timer.of( userKey, @@ -2687,6 +2702,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator @Override public void output(OutputT output) { + checkTimerTimestamp(currentTimer.getHoldTimestamp()); outputTo( mainOutputConsumers, WindowedValue.of( @@ -2703,6 +2719,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator @Override public <T> void output(TupleTag<T> tag, T output) { + checkTimerTimestamp(currentTimer.getHoldTimestamp()); Collection<FnDataReceiver<WindowedValue<T>>> consumers = (Collection) localNameToConsumer.get(tag.getId()); if (consumers == null) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index c0f60f34c61..c9a070fbd8f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -68,7 +68,6 @@ import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.ShardedKey; @@ -262,10 +261,7 @@ public class StorageApiWritesShardedRecords<DestinationT, ElementT> streamsCreated.inc(); } // Reset the idle timer. - streamIdleTimer - .offset(streamIdleTime) - .withOutputTimestamp(GlobalWindow.INSTANCE.maxTimestamp()) - .setRelative(); + streamIdleTimer.offset(streamIdleTime).withNoOutputTimestamp().setRelative(); return stream; } @@ -524,10 +520,7 @@ public class StorageApiWritesShardedRecords<DestinationT, ElementT> java.time.Duration timeElapsed = java.time.Duration.between(now, Instant.now()); appendLatencyDistribution.update(timeElapsed.toMillis()); - idleTimer - .offset(streamIdleTime) - .withOutputTimestamp(GlobalWindow.INSTANCE.maxTimestamp()) - .setRelative(); + idleTimer.offset(streamIdleTime).withNoOutputTimestamp().setRelative(); } // called by the idleTimer and window-expiration handlers.