Use informative Instant formatter in WatermarkHold

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa4958a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa4958a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa4958a6

Branch: refs/heads/python-sdk
Commit: fa4958a6140eb00ceee08b2468f7d88f17538794
Parents: 280a6a8
Author: Kenneth Knowles <k...@google.com>
Authored: Mon Dec 19 20:40:47 2016 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Wed Dec 21 13:45:37 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/runners/core/WatermarkHold.java  |  4 +++-
 .../sdk/transforms/windowing/BoundedWindow.java  | 19 +++++++++++++++++++
 2 files changed, 22 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa4958a6/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 7f1afcc..5e5f44d 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -207,7 +207,9 @@ class WatermarkHold<W extends BoundedWindow> implements 
Serializable {
     Instant shifted = 
windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window);
     checkState(!shifted.isBefore(timestamp),
         "OutputTimeFn moved element from %s to earlier time %s for window %s",
-        timestamp, shifted, window);
+        BoundedWindow.formatTimestamp(timestamp),
+        BoundedWindow.formatTimestamp(shifted),
+        window);
     checkState(timestamp.isAfter(window.maxTimestamp())
             || !shifted.isAfter(window.maxTimestamp()),
         "OutputTimeFn moved element from %s to %s which is beyond end of "

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa4958a6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
index 6da2495..74223b5 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
@@ -34,11 +34,30 @@ import org.joda.time.Instant;
 public abstract class BoundedWindow {
   // The min and max timestamps that won't overflow when they are converted to
   // usec.
+
+  /**
+   * The minimum value for any Beam timestamp. Often referred to as 
"-infinity".
+   *
+   * <p>This value and {@link #TIMESTAMP_MAX_VALUE} are chosen so that their
+   * microseconds-since-epoch can be safely represented with a {@code long}.
+   */
   public static final Instant TIMESTAMP_MIN_VALUE =
       new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
+
+  /**
+   * The maximum value for any Beam timestamp. Often referred to as 
"+infinity".
+   *
+   * <p>This value and {@link #TIMESTAMP_MIN_VALUE} are chosen so that their
+   * microseconds-since-epoch can be safely represented with a {@code long}.
+   */
   public static final Instant TIMESTAMP_MAX_VALUE =
       new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
 
+  /**
+   * Formats a {@link Instant} timestamp with additional Beam-specific 
metadata, such as indicating
+   * whether the timestamp is the end of the global window or one of the 
distinguished values {@link
+   * #TIMESTAMP_MIN_VALUE} or {@link #TIMESTAMP_MIN_VALUE}.
+   */
   public static String formatTimestamp(Instant timestamp) {
     if (timestamp.equals(TIMESTAMP_MIN_VALUE)) {
       return timestamp.toString() + " (TIMESTAMP_MIN_VALUE)";

Reply via email to