This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fa01615e520 Merge pull request #17359: [BEAM-14303] Add a way to 
exclude output timestamp watermark holds
fa01615e520 is described below

commit fa01615e5207b53a7f73ff6a5ff55c336717c88f
Author: Reuven Lax <re...@google.com>
AuthorDate: Thu May 5 14:23:48 2022 -0700

    Merge pull request #17359: [BEAM-14303] Add a way to exclude output 
timestamp watermark holds
---
 .../apache/beam/runners/core/SimpleDoFnRunner.java | 170 +++++++++------------
 .../beam/runners/core/SimpleDoFnRunnerTest.java    |  10 +-
 .../dataflow/worker/WindmillTimerInternals.java    |  10 +-
 .../main/java/org/apache/beam/sdk/state/Timer.java |   6 +
 .../apache/beam/sdk/transforms/Deduplicate.java    |   8 +-
 .../org/apache/beam/sdk/transforms/ParDoTest.java  |  50 ++++++
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    |  65 +++++---
 .../bigquery/StorageApiWritesShardedRecords.java   |  11 +-
 8 files changed, 186 insertions(+), 144 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index a73dd521f86..a7d67bf7526 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -246,6 +246,30 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     return sideInputReader.get(view, sideInputWindow);
   }
 
+  @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, 
but must be respected
+  private void checkTimestamp(Instant elemTimestamp, Instant timestamp) {
+    Instant lowerBound;
+    try {
+      lowerBound = elemTimestamp.minus(fn.getAllowedTimestampSkew());
+    } catch (ArithmeticException e) {
+      lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+    if (timestamp.isBefore(lowerBound) || 
timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Cannot output with timestamp %s. Output timestamps must be no 
earlier than the "
+                  + "timestamp of the current input or timer (%s) minus the 
allowed skew (%s) and no "
+                  + "later than %s. See the DoFn#getAllowedTimestampSkew() 
Javadoc for details "
+                  + "on changing the allowed skew.",
+              timestamp,
+              elemTimestamp,
+              fn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE
+                  ? fn.getAllowedTimestampSkew()
+                  : 
PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()),
+              BoundedWindow.TIMESTAMP_MAX_VALUE));
+    }
+  }
+
   private <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> 
windowedElem) {
     checkArgument(outputTags.contains(tag), "Unknown output tag %s", tag);
     outputManager.output(tag, windowedElem);
@@ -389,7 +413,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
 
     @Override
     public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      checkTimestamp(timestamp);
+      checkTimestamp(elem.getTimestamp(), timestamp);
       outputWithTimestamp(mainOutputTag, output, timestamp);
     }
 
@@ -402,7 +426,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     @Override
     public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
       checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null");
-      checkTimestamp(timestamp);
+      checkTimestamp(elem.getTimestamp(), timestamp);
       outputWindowedValue(
           tag, WindowedValue.of(output, timestamp, elem.getWindows(), 
elem.getPane()));
     }
@@ -416,30 +440,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
       return elem.getWindows();
     }
 
-    @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, 
but must be respected
-    private void checkTimestamp(Instant timestamp) {
-      Instant lowerBound;
-      try {
-        lowerBound = elem.getTimestamp().minus(fn.getAllowedTimestampSkew());
-      } catch (ArithmeticException e) {
-        lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
-      }
-      if (timestamp.isBefore(lowerBound) || 
timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
-        throw new IllegalArgumentException(
-            String.format(
-                "Cannot output with timestamp %s. Output timestamps must be no 
earlier than the "
-                    + "timestamp of the current input (%s) minus the allowed 
skew (%s) and no "
-                    + "later than %s. See the DoFn#getAllowedTimestampSkew() 
Javadoc for details "
-                    + "on changing the allowed skew.",
-                timestamp,
-                elem.getTimestamp(),
-                fn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE
-                    ? fn.getAllowedTimestampSkew()
-                    : 
PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()),
-                BoundedWindow.TIMESTAMP_MAX_VALUE));
-      }
-    }
-
     @Override
     public BoundedWindow window() {
       return Iterables.getOnlyElement(elem.getWindows());
@@ -834,18 +834,19 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
 
     @Override
     public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      checkTimestamp(timestamp);
+      checkTimestamp(timestamp(), timestamp);
       outputWithTimestamp(mainOutputTag, output, timestamp);
     }
 
     @Override
     public <T> void output(TupleTag<T> tag, T output) {
+      checkTimestamp(timestamp(), timestamp);
       outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), 
PaneInfo.NO_FIRING));
     }
 
     @Override
     public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
-      checkTimestamp(timestamp);
+      checkTimestamp(timestamp(), timestamp);
       outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), 
PaneInfo.NO_FIRING));
     }
 
@@ -854,30 +855,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
       throw new UnsupportedOperationException(
           "Bundle finalization is not supported in non-portable pipelines.");
     }
-
-    @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, 
but must be respected
-    private void checkTimestamp(Instant timestamp) {
-      Instant lowerBound;
-      try {
-        lowerBound = timestamp().minus(fn.getAllowedTimestampSkew());
-      } catch (ArithmeticException e) {
-        lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
-      }
-      if (timestamp.isBefore(lowerBound) || 
timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
-        throw new IllegalArgumentException(
-            String.format(
-                "Cannot output with timestamp %s. Output timestamps must be no 
earlier than the "
-                    + "output timestamp of the timer (%s) minus the allowed 
skew (%s) and no "
-                    + "later than %s. See the DoFn#getAllowedTimestampSkew() 
Javadoc for details "
-                    + "on changing the allowed skew.",
-                timestamp,
-                timestamp(),
-                fn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE
-                    ? fn.getAllowedTimestampSkew()
-                    : 
PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()),
-                BoundedWindow.TIMESTAMP_MAX_VALUE));
-      }
-    }
   }
 
   /**
@@ -1064,17 +1041,19 @@ public class SimpleDoFnRunner<InputT, OutputT> 
implements DoFnRunner<InputT, Out
 
     @Override
     public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      checkTimestamp(this.timestamp, timestamp);
       outputWithTimestamp(mainOutputTag, output, timestamp);
     }
 
     @Override
     public <T> void output(TupleTag<T> tag, T output) {
+      checkTimestamp(this.timestamp, timestamp);
       outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), 
PaneInfo.NO_FIRING));
     }
 
     @Override
     public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
-      checkTimestamp(timestamp);
+      checkTimestamp(this.timestamp, timestamp);
       outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), 
PaneInfo.NO_FIRING));
     }
 
@@ -1083,30 +1062,6 @@ public class SimpleDoFnRunner<InputT, OutputT> 
implements DoFnRunner<InputT, Out
       throw new UnsupportedOperationException(
           "Bundle finalization is not supported in non-portable pipelines.");
     }
-
-    @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, 
but must be respected
-    private void checkTimestamp(Instant timestamp) {
-      Instant lowerBound;
-      try {
-        lowerBound = this.timestamp.minus(fn.getAllowedTimestampSkew());
-      } catch (ArithmeticException e) {
-        lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
-      }
-      if (timestamp.isBefore(lowerBound) || 
timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
-        throw new IllegalArgumentException(
-            String.format(
-                "Cannot output with timestamp %s. Output timestamps must be no 
earlier than the "
-                    + "output timestamp of the window (%s) minus the allowed 
skew (%s) and no "
-                    + "later than %s. See the DoFn#getAllowedTimestampSkew() 
Javadoc for details "
-                    + "on changing the allowed skew.",
-                timestamp,
-                this.timestamp,
-                fn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE
-                    ? fn.getAllowedTimestampSkew()
-                    : 
PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()),
-                BoundedWindow.TIMESTAMP_MAX_VALUE));
-      }
-    }
   }
 
   private class TimerInternalsTimer implements Timer {
@@ -1121,7 +1076,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     private final String timerFamilyId;
     private final TimerSpec spec;
     private Instant target;
-    private Instant outputTimestamp;
+    private @Nullable Instant outputTimestamp;
+    private boolean noOutputTimestamp;
     private final Instant elementInputTimestamp;
     private Duration period = Duration.ZERO;
     private Duration offset = Duration.ZERO;
@@ -1138,6 +1094,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
       this.timerId = timerId;
       this.timerFamilyId = "";
       this.spec = spec;
+      this.noOutputTimestamp = false;
       this.elementInputTimestamp = elementInputTimestamp;
       this.timerInternals = timerInternals;
     }
@@ -1216,6 +1173,14 @@ public class SimpleDoFnRunner<InputT, OutputT> 
implements DoFnRunner<InputT, Out
     @Override
     public Timer withOutputTimestamp(Instant outputTimestamp) {
       this.outputTimestamp = outputTimestamp;
+      this.noOutputTimestamp = false;
+      return this;
+    }
+
+    @Override
+    public Timer withNoOutputTimestamp() {
+      this.outputTimestamp = null;
+      this.noOutputTimestamp = true;
       return this;
     }
 
@@ -1251,38 +1216,41 @@ public class SimpleDoFnRunner<InputT, OutputT> 
implements DoFnRunner<InputT, Out
                       : 
PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()),
                   BoundedWindow.TIMESTAMP_MAX_VALUE));
         }
-      } else if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
+      } else if (!noOutputTimestamp && 
TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
         // The outputTimestamp was unset and this is a timer in the EVENT_TIME 
domain. The output
         // timestamp will be the firing timestamp.
         outputTimestamp = target;
-      } else {
+      } else if (!noOutputTimestamp) {
         // The outputTimestamp was unset and this is a timer in the 
PROCESSING_TIME
         // (or SYNCHRONIZED_PROCESSING_TIME) domain. The output timestamp will 
be the timestamp of
         // the element (or timer) setting this timer.
         outputTimestamp = elementInputTimestamp;
       }
-
-      Instant windowExpiry = LateDataUtils.garbageCollectionTime(window, 
allowedLateness);
-      if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
-        checkArgument(
-            !outputTimestamp.isAfter(windowExpiry),
-            "Attempted to set an event-time timer with an output timestamp of 
%s that is"
-                + " after the expiration of window %s",
-            outputTimestamp,
-            windowExpiry);
-        checkArgument(
-            !target.isAfter(windowExpiry),
-            "Attempted to set an event-time timer with a firing timestamp of 
%s that is"
-                + " after the expiration of window %s",
-            target,
-            windowExpiry);
+      if (outputTimestamp != null) {
+        Instant windowExpiry = LateDataUtils.garbageCollectionTime(window, 
allowedLateness);
+        if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
+          checkArgument(
+              !outputTimestamp.isAfter(windowExpiry),
+              "Attempted to set an event-time timer with an output timestamp 
of %s that is"
+                  + " after the expiration of window %s",
+              outputTimestamp,
+              windowExpiry);
+          checkArgument(
+              !target.isAfter(windowExpiry),
+              "Attempted to set an event-time timer with a firing timestamp of 
%s that is"
+                  + " after the expiration of window %s",
+              target,
+              windowExpiry);
+        } else {
+          checkArgument(
+              !outputTimestamp.isAfter(windowExpiry),
+              "Attempted to set a processing-time timer with an output 
timestamp of %s that is"
+                  + " after the expiration of window %s",
+              outputTimestamp,
+              windowExpiry);
+        }
       } else {
-        checkArgument(
-            !outputTimestamp.isAfter(windowExpiry),
-            "Attempted to set a processing-time timer with an output timestamp 
of %s that is"
-                + " after the expiration of window %s",
-            outputTimestamp,
-            windowExpiry);
+        outputTimestamp = 
BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.millis(1));
       }
     }
 
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index 0cb53ed7b2f..51c6e1d83cf 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -318,7 +318,8 @@ public class SimpleDoFnRunnerTest {
         allOf(
             containsString("must be no earlier"),
             containsString(
-                String.format("timestamp of the current input (%s)", new 
Instant(0).toString())),
+                String.format(
+                    "timestamp of the current input or timer (%s)", new 
Instant(0).toString())),
             containsString(
                 String.format(
                     "the allowed skew (%s)",
@@ -369,7 +370,8 @@ public class SimpleDoFnRunnerTest {
         allOf(
             containsString("must be no earlier"),
             containsString(
-                String.format("timestamp of the current input (%s)", new 
Instant(0).toString())),
+                String.format(
+                    "timestamp of the current input or timer (%s)", new 
Instant(0).toString())),
             containsString(
                 String.format(
                     "the allowed skew (%s)",
@@ -626,7 +628,9 @@ public class SimpleDoFnRunnerTest {
         exception.getMessage(),
         allOf(
             containsString("must be no earlier"),
-            containsString(String.format("timestamp of the timer (%s)", new 
Instant(0).toString())),
+            containsString(
+                String.format(
+                    "timestamp of the current input or timer (%s)", new 
Instant(0).toString())),
             containsString(
                 String.format(
                     "the allowed skew (%s)",
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index c732bdb1060..b0eb8674d99 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -31,6 +31,7 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.ExposedByteArrayInputStream;
 import org.apache.beam.sdk.util.ExposedByteArrayOutputStream;
 import org.apache.beam.sdk.util.VarInt;
@@ -40,6 +41,7 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBase
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table.Cell;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 /**
@@ -207,7 +209,11 @@ class WindmillTimerInternals implements TimerInternals {
       if (cell.getValue()) {
         // Setting the timer. If it is a user timer, set a hold.
 
-        if (needsWatermarkHold(timerData)) {
+        // Only set a hold if it's needed and if the hold is before the end of 
the global window.
+        if (needsWatermarkHold(timerData)
+            && timerData
+                .getOutputTimestamp()
+                
.isBefore(GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1)))) {
           // Setting a timer, clear any prior hold and set to the new value
           outputBuilder
               .addWatermarkHoldsBuilder()
@@ -220,6 +226,8 @@ class WindmillTimerInternals implements TimerInternals {
       } else {
         // Deleting a timer. If it is a user timer, clear the hold
         timer.clearTimestamp();
+        // Clear the hold even if it's the end of the global window in order 
to maintain update
+        // compatibility.
         if (needsWatermarkHold(timerData)) {
           // We are deleting timer; clear the hold
           outputBuilder
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
index 78453ee7c1b..efaf0154ef3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
@@ -99,6 +99,12 @@ public interface Timer {
    */
   Timer withOutputTimestamp(Instant outputTime);
 
+  /**
+   * Asserts that there is no output timestamp. The output watermark will not 
be held up, and it is
+   * illegal to output messages from this timer using the default output 
timestamp.
+   */
+  Timer withNoOutputTimestamp();
+
   /**
    * Returns the current relative time used by {@link #setRelative()} and 
{@link #offset}. This can
    * be used by a client that self-manages relative timers (e.g. one that 
stores the current timer
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Deduplicate.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Deduplicate.java
index e9fa0e2995f..bbbcb859aef 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Deduplicate.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Deduplicate.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.state.TimerSpec;
 import org.apache.beam.sdk.state.TimerSpecs;
 import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -307,17 +306,14 @@ public final class Deduplicate {
     @ProcessElement
     public void processElement(
         @Element KV<K, V> element,
-        BoundedWindow window,
         OutputReceiver<KV<K, V>> receiver,
         @StateId(SEEN_STATE) ValueState<Boolean> seenState,
         @TimerId(EXPIRY_TIMER) Timer expiryTimer) {
       Boolean seen = seenState.read();
       // Seen state is either set or not set so if it has been set then it 
must be true.
       if (seen == null) {
-        // We don't want the expiry timer to hold up watermarks, so we set its 
output timestamp to
-        // the end of the
-        // window.
-        
expiryTimer.offset(duration).withOutputTimestamp(window.maxTimestamp()).setRelative();
+        // We don't want the expiry timer to hold up watermarks.
+        expiryTimer.offset(duration).withNoOutputTimestamp().setRelative();
         seenState.write(true);
         receiver.output(element);
       }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 3207295a755..6cc943e1d88 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -4336,6 +4336,56 @@ public class ParDoTest implements Serializable {
       pipeline.run();
     }
 
+    @Test
+    @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+    public void testNoOutputTimestampDefaultBounded() throws Exception {
+      runTestNoOutputTimestampDefault(false);
+    }
+
+    @Test
+    @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+    public void testNoOutputTimestampDefaultStreaming() throws Exception {
+      runTestNoOutputTimestampDefault(false);
+    }
+
+    public void runTestNoOutputTimestampDefault(boolean useStreaming) throws 
Exception {
+      final String timerId = "foo";
+      DoFn<KV<String, Long>, Long> fn1 =
+          new DoFn<KV<String, Long>, Long>() {
+
+            @TimerId(timerId)
+            private final TimerSpec timer = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+            @ProcessElement
+            public void processElement(
+                @TimerId(timerId) Timer timer, @Timestamp Instant timestamp) {
+              
timer.withNoOutputTimestamp().set(timestamp.plus(Duration.millis(10)));
+            }
+
+            @OnTimer(timerId)
+            public void onTimer(@Timestamp Instant timestamp, 
OutputReceiver<Long> o) {
+              try {
+                o.output(timestamp.getMillis());
+                fail("Should have failed due to outputting when 
noOutputTimestamp was set.");
+              } catch (IllegalArgumentException e) {
+                System.err.println("EXCEPTION " + e.getMessage() + " stack ");
+                e.printStackTrace();
+                Preconditions.checkState(e.getMessage().contains("Cannot 
output with timestamp"));
+              }
+            }
+          };
+
+      if (useStreaming) {
+        pipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
+      }
+      PCollection<Long> output =
+          pipeline
+              .apply(Create.timestamped(TimestampedValue.of(KV.of("hello", 
1L), new Instant(3))))
+              .apply("first", ParDo.of(fn1));
+
+      pipeline.run();
+    }
+
     @Test
     @Category({ValidatesRunner.class, UsesTimersInParDo.class})
     public void testOutOfBoundsEventTimeTimerHold() throws Exception {
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index cf84027a7e9..2d3462eb269 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -1774,7 +1774,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
     private final BoundedWindow boundedWindow;
     private final PaneInfo paneInfo;
 
-    private Instant outputTimestamp;
+    private @Nullable Instant outputTimestamp;
+    private boolean noOutputTimestamp;
     private Duration period = Duration.ZERO;
     private Duration offset = Duration.ZERO;
 
@@ -1793,6 +1794,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
       this.elementTimestampOrTimerHoldTimestamp = 
elementTimestampOrTimerHoldTimestamp;
       this.boundedWindow = boundedWindow;
       this.paneInfo = paneInfo;
+      this.noOutputTimestamp = false;
       this.timeDomain = timeDomain;
 
       switch (timeDomain) {
@@ -1861,6 +1863,14 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
     @Override
     public org.apache.beam.sdk.state.Timer withOutputTimestamp(Instant 
outputTime) {
       this.outputTimestamp = outputTime;
+      this.noOutputTimestamp = false;
+      return this;
+    }
+
+    @Override
+    public org.apache.beam.sdk.state.Timer withNoOutputTimestamp() {
+      this.outputTimestamp = null;
+      this.noOutputTimestamp = true;
       return this;
     }
 
@@ -1914,40 +1924,45 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
       }
 
       // Output timestamp is set to the delivery time if not initialized by an 
user.
-      if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(timeDomain)) 
{
+      if (!noOutputTimestamp
+          && outputTimestamp == null
+          && TimeDomain.EVENT_TIME.equals(timeDomain)) {
         outputTimestamp = scheduledTime;
       }
 
       // For processing timers
-      if (outputTimestamp == null) {
+      if (!noOutputTimestamp && outputTimestamp == null) {
         // For processing timers output timestamp will be:
         // 1) timestamp of input element
         // OR
         // 2) hold timestamp of firing timer.
         outputTimestamp = elementTimestampOrTimerHoldTimestamp;
       }
-
-      Instant windowExpiry = 
LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness);
-      if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
-        checkArgument(
-            !outputTimestamp.isAfter(scheduledTime),
-            "Attempted to set an event-time timer with an output timestamp of 
%s that is"
-                + " after the timer firing timestamp %s",
-            outputTimestamp,
-            scheduledTime);
-        checkArgument(
-            !scheduledTime.isAfter(windowExpiry),
-            "Attempted to set an event-time timer with a firing timestamp of 
%s that is"
-                + " after the expiration of window %s",
-            scheduledTime,
-            windowExpiry);
+      if (outputTimestamp != null) {
+        Instant windowExpiry = 
LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness);
+        if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
+          checkArgument(
+              !outputTimestamp.isAfter(scheduledTime),
+              "Attempted to set an event-time timer with an output timestamp 
of %s that is"
+                  + " after the timer firing timestamp %s",
+              outputTimestamp,
+              scheduledTime);
+          checkArgument(
+              !scheduledTime.isAfter(windowExpiry),
+              "Attempted to set an event-time timer with a firing timestamp of 
%s that is"
+                  + " after the expiration of window %s",
+              scheduledTime,
+              windowExpiry);
+        } else {
+          checkArgument(
+              !outputTimestamp.isAfter(windowExpiry),
+              "Attempted to set a processing-time timer with an output 
timestamp of %s that is"
+                  + " after the expiration of window %s",
+              outputTimestamp,
+              windowExpiry);
+        }
       } else {
-        checkArgument(
-            !outputTimestamp.isAfter(windowExpiry),
-            "Attempted to set a processing-time timer with an output timestamp 
of %s that is"
-                + " after the expiration of window %s",
-            outputTimestamp,
-            windowExpiry);
+        outputTimestamp = 
BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.millis(1));
       }
       return Timer.of(
           userKey,
@@ -2687,6 +2702,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
 
       @Override
       public void output(OutputT output) {
+        checkTimerTimestamp(currentTimer.getHoldTimestamp());
         outputTo(
             mainOutputConsumers,
             WindowedValue.of(
@@ -2703,6 +2719,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
 
       @Override
       public <T> void output(TupleTag<T> tag, T output) {
+        checkTimerTimestamp(currentTimer.getHoldTimestamp());
         Collection<FnDataReceiver<WindowedValue<T>>> consumers =
             (Collection) localNameToConsumer.get(tag.getId());
         if (consumers == null) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index c0f60f34c61..c9a070fbd8f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -68,7 +68,6 @@ import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.ShardedKey;
@@ -262,10 +261,7 @@ public class StorageApiWritesShardedRecords<DestinationT, 
ElementT>
         streamsCreated.inc();
       }
       // Reset the idle timer.
-      streamIdleTimer
-          .offset(streamIdleTime)
-          .withOutputTimestamp(GlobalWindow.INSTANCE.maxTimestamp())
-          .setRelative();
+      
streamIdleTimer.offset(streamIdleTime).withNoOutputTimestamp().setRelative();
 
       return stream;
     }
@@ -524,10 +520,7 @@ public class StorageApiWritesShardedRecords<DestinationT, 
ElementT>
 
       java.time.Duration timeElapsed = java.time.Duration.between(now, 
Instant.now());
       appendLatencyDistribution.update(timeElapsed.toMillis());
-      idleTimer
-          .offset(streamIdleTime)
-          .withOutputTimestamp(GlobalWindow.INSTANCE.maxTimestamp())
-          .setRelative();
+      idleTimer.offset(streamIdleTime).withNoOutputTimestamp().setRelative();
     }
 
     // called by the idleTimer and window-expiration handlers.

Reply via email to