Repository: beam
Updated Branches:
  refs/heads/master 9061c65e3 -> db19c7df5


Reify delay and alignment in AfterProcessingTime transform


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/40c4a5cb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/40c4a5cb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/40c4a5cb

Branch: refs/heads/master
Commit: 40c4a5cb6eaa0350a26fe1f215eb812541a7b105
Parents: bea101a
Author: Kenneth Knowles <k...@google.com>
Authored: Sun Feb 12 15:03:48 2017 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Tue Feb 14 14:55:48 2017 -0800

----------------------------------------------------------------------
 .../AfterDelayFromFirstElementStateMachine.java |   4 +-
 .../AfterProcessingTimeStateMachine.java        |   2 +
 ...rSynchronizedProcessingTimeStateMachine.java |   1 +
 .../core/triggers/TriggerStateMachines.java     |  36 ++-
 .../core/triggers/TriggerStateMachinesTest.java |   7 +-
 .../windowing/AfterDelayFromFirstElement.java   | 240 -------------------
 .../windowing/AfterProcessingTime.java          | 105 ++++++--
 .../AfterSynchronizedProcessingTime.java        |  25 +-
 .../windowing/TimestampTransform.java           |  64 +++++
 .../windowing/AfterProcessingTimeTest.java      |   2 +-
 10 files changed, 192 insertions(+), 294 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/40c4a5cb/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index b720644..4444c22 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -47,6 +47,8 @@ import org.joda.time.format.PeriodFormatter;
  *
  * <p>This class is for internal use only and may change at any time.
  */
+// This class should be inlined to subclasses and deleted, simplifying them too
+// https://issues.apache.org/jira/browse/BEAM-1486
 @Experimental(Experimental.Kind.TRIGGER)
 public abstract class AfterDelayFromFirstElementStateMachine extends 
OnceTriggerStateMachine {
 
@@ -250,7 +252,7 @@ public abstract class 
AfterDelayFromFirstElementStateMachine extends OnceTrigger
   /**
    * A {@link SerializableFunction} to delay the timestamp at which this 
triggers fires.
    */
-  private static final class DelayFn implements SerializableFunction<Instant, 
Instant> {
+  static final class DelayFn implements SerializableFunction<Instant, Instant> 
{
     private final Duration delay;
 
     public DelayFn(Duration delay) {

http://git-wip-us.apache.org/repos/asf/beam/blob/40c4a5cb/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java
index 2490463..eaf5613 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java
@@ -34,6 +34,8 @@ import org.joda.time.Instant;
  * AfterDelayFromFirstElementStateMachine#plusDelayOf} or {@link
  * AfterDelayFromFirstElementStateMachine#alignedTo}.
  */
+// The superclass should be inlined here, its only real use
+// https://issues.apache.org/jira/browse/BEAM-1486
 @Experimental(Experimental.Kind.TRIGGER)
 public class AfterProcessingTimeStateMachine extends 
AfterDelayFromFirstElementStateMachine {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/40c4a5cb/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java
index 000f6e7..1319a13 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.joda.time.Instant;
 
+// This should not really have the superclass 
https://issues.apache.org/jira/browse/BEAM-1486
 class AfterSynchronizedProcessingTimeStateMachine extends 
AfterDelayFromFirstElementStateMachine {
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/40c4a5cb/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
index 1be7981..f0e9d21 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
@@ -18,14 +18,16 @@
 package org.apache.beam.runners.core.triggers;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Lists;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.List;
+import javax.annotation.Nonnull;
 import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.AfterAll;
-import org.apache.beam.sdk.transforms.windowing.AfterDelayFromFirstElement;
 import org.apache.beam.sdk.transforms.windowing.AfterEach;
 import org.apache.beam.sdk.transforms.windowing.AfterFirst;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
@@ -36,6 +38,7 @@ import 
org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger;
 import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.TimestampTransform;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
 import org.apache.beam.sdk.util.ReshuffleTrigger;
@@ -171,18 +174,37 @@ public class TriggerStateMachines {
     }
 
     private OnceTriggerStateMachine evaluateSpecific(AfterProcessingTime v) {
-      return evaluateSpecific((AfterDelayFromFirstElement) v);
-    }
-
-    private OnceTriggerStateMachine evaluateSpecific(final 
AfterDelayFromFirstElement v) {
       return new AfterDelayFromFirstElementStateMachineAdapter(v);
     }
 
     private static class AfterDelayFromFirstElementStateMachineAdapter
         extends AfterDelayFromFirstElementStateMachine {
 
-      public 
AfterDelayFromFirstElementStateMachineAdapter(AfterDelayFromFirstElement v) {
-        this(v.getTimeDomain(), v.getTimestampMappers());
+      private static final Function<TimestampTransform, 
SerializableFunction<Instant, Instant>>
+          CONVERT_TIMESTAMP_TRANSFORM =
+              new Function<TimestampTransform, SerializableFunction<Instant, 
Instant>>() {
+                @Override
+                public SerializableFunction<Instant, Instant> apply(
+                    @Nonnull TimestampTransform transform) {
+                  if (transform instanceof TimestampTransform.Delay) {
+                    return new DelayFn(((TimestampTransform.Delay) 
transform).getDelay());
+                  } else if (transform instanceof TimestampTransform.AlignTo) {
+                    TimestampTransform.AlignTo alignTo = 
(TimestampTransform.AlignTo) transform;
+                    return new AlignFn(alignTo.getPeriod(), 
alignTo.getOffset());
+                  } else {
+                    throw new IllegalArgumentException(
+                        String.format(
+                            "Unknown %s: %s", 
TimestampTransform.class.getSimpleName(), transform));
+                  }
+                }
+              };
+
+      public AfterDelayFromFirstElementStateMachineAdapter(AfterProcessingTime 
v) {
+        this(
+            TimeDomain.PROCESSING_TIME,
+            FluentIterable.from(v.getTimestampTransforms())
+                .transform(CONVERT_TIMESTAMP_TRANSFORM)
+                .toList());
       }
 
       private AfterDelayFromFirstElementStateMachineAdapter(

http://git-wip-us.apache.org/repos/asf/beam/blob/40c4a5cb/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java
index 37f8f10..26c0597 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertThat;
 
 import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.sdk.transforms.windowing.AfterAll;
-import org.apache.beam.sdk.transforms.windowing.AfterDelayFromFirstElement;
 import org.apache.beam.sdk.transforms.windowing.AfterEach;
 import org.apache.beam.sdk.transforms.windowing.AfterFirst;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
@@ -64,7 +63,7 @@ public class TriggerStateMachinesTest {
     Duration minutes = Duration.standardMinutes(94);
     Duration hours = Duration.standardHours(13);
 
-    AfterDelayFromFirstElement trigger =
+    AfterProcessingTime trigger =
         
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(minutes).alignedTo(hours);
 
     AfterDelayFromFirstElementStateMachine machine =
@@ -72,10 +71,6 @@ public class TriggerStateMachinesTest {
             TriggerStateMachines.stateMachineForOnceTrigger(trigger);
 
     assertThat(machine.getTimeDomain(), equalTo(TimeDomain.PROCESSING_TIME));
-
-    // This equality is function equality, but due to the structure of the 
code (no serialization)
-    // it is OK to check
-    assertThat(machine.getTimestampMappers(), 
equalTo(trigger.getTimestampMappers()));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/40c4a5cb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
deleted file mode 100644
index da6d74a..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import com.google.common.collect.ImmutableList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Objects;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.joda.time.format.PeriodFormat;
-import org.joda.time.format.PeriodFormatter;
-
-/**
- * A base class for triggers that happen after a processing time delay from 
the arrival
- * of the first element in a pane.
- *
- * <p>This class is for internal use only and may change at any time.
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public abstract class AfterDelayFromFirstElement extends OnceTrigger {
-
-  protected static final List<SerializableFunction<Instant, Instant>> IDENTITY 
=
-      ImmutableList.<SerializableFunction<Instant, Instant>>of();
-
-  private static final PeriodFormatter PERIOD_FORMATTER = 
PeriodFormat.wordBased(Locale.ENGLISH);
-
-  /**
-   * To complete an implementation, return a new instance like this one, but 
incorporating
-   * the provided timestamp mapping functions. Generally should be used by 
calling the
-   * constructor of this class from the constructor of the subclass.
-   */
-  protected abstract AfterDelayFromFirstElement newWith(
-      List<SerializableFunction<Instant, Instant>> transform);
-
-  /**
-   * A list of timestampMappers m1, m2, m3, ... m_n considered to be composed 
in sequence. The
-   * overall mapping for an instance `instance` is `m_n(... 
m3(m2(m1(instant))`,
-   * implemented via #computeTargetTimestamp
-   */
-  protected final List<SerializableFunction<Instant, Instant>> 
timestampMappers;
-
-  private final TimeDomain timeDomain;
-
-  public AfterDelayFromFirstElement(
-      TimeDomain timeDomain,
-      List<SerializableFunction<Instant, Instant>> timestampMappers) {
-    super(null);
-    this.timestampMappers = timestampMappers;
-    this.timeDomain = timeDomain;
-  }
-
-  /**
-   * The time domain according for which this trigger sets timers.
-   */
-  public TimeDomain getTimeDomain() {
-    return timeDomain;
-  }
-
-  /**
-   * The mapping functions applied to the arrival time of an element to 
determine when to
-   * set a wake-up timer for triggering.
-   */
-  public List<SerializableFunction<Instant, Instant>> getTimestampMappers() {
-    return timestampMappers;
-  }
-
-  /**
-   * Aligns timestamps to the smallest multiple of {@code size} since the 
{@code offset} greater
-   * than the timestamp.
-   *
-   * <p>TODO: Consider sharing this with FixedWindows, and bring over the 
equivalent of
-   * CalendarWindows.
-   */
-  public AfterDelayFromFirstElement alignedTo(final Duration size, final 
Instant offset) {
-    return newWith(new AlignFn(size, offset));
-  }
-
-  /**
-   * Aligns the time to be the smallest multiple of {@code size} greater than 
the timestamp
-   * since the epoch.
-   */
-  public AfterDelayFromFirstElement alignedTo(final Duration size) {
-    return alignedTo(size, new Instant(0));
-  }
-
-  /**
-   * Adds some delay to the original target time.
-   *
-   * @param delay the delay to add
-   * @return An updated time trigger that will wait the additional time before 
firing.
-   */
-  public AfterDelayFromFirstElement plusDelayOf(final Duration delay) {
-    return newWith(new DelayFn(delay));
-  }
-
-  /**
-   * @deprecated This will be removed in the next major version. Please use 
only
-   *             {@link #plusDelayOf} and {@link #alignedTo}.
-   */
-  @Deprecated
-  public OnceTrigger mappedTo(SerializableFunction<Instant, Instant> 
timestampMapper) {
-    return newWith(timestampMapper);
-  }
-
-  @Override
-  public boolean isCompatible(Trigger other) {
-    if (!getClass().equals(other.getClass())) {
-      return false;
-    }
-
-    AfterDelayFromFirstElement that = (AfterDelayFromFirstElement) other;
-    return this.timestampMappers.equals(that.timestampMappers);
-  }
-
-
-  private AfterDelayFromFirstElement newWith(
-      SerializableFunction<Instant, Instant> timestampMapper) {
-    return newWith(
-        ImmutableList.<SerializableFunction<Instant, Instant>>builder()
-            .addAll(timestampMappers)
-            .add(timestampMapper)
-            .build());
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-    return BoundedWindow.TIMESTAMP_MAX_VALUE;
-  }
-
-  protected Instant computeTargetTimestamp(Instant time) {
-    Instant result = time;
-    for (SerializableFunction<Instant, Instant> timestampMapper : 
timestampMappers) {
-      result = timestampMapper.apply(result);
-    }
-    return result;
-  }
-
-  /**
-   * A {@link SerializableFunction} to delay the timestamp at which this 
triggers fires.
-   */
-  private static final class DelayFn implements SerializableFunction<Instant, 
Instant> {
-    private final Duration delay;
-
-    public DelayFn(Duration delay) {
-      this.delay = delay;
-    }
-
-    @Override
-    public Instant apply(Instant input) {
-      return input.plus(delay);
-    }
-
-    @Override
-    public boolean equals(Object object) {
-      if (object == this) {
-        return true;
-      }
-
-      if (!(object instanceof DelayFn)) {
-        return false;
-      }
-
-      return this.delay.equals(((DelayFn) object).delay);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(delay);
-    }
-
-    @Override
-    public String toString() {
-      return PERIOD_FORMATTER.print(delay.toPeriod());
-    }
-  }
-
-  /**
-   * A {@link SerializableFunction} to align an instant to the nearest 
interval boundary.
-   */
-  static final class AlignFn implements SerializableFunction<Instant, Instant> 
{
-    private final Duration size;
-    private final Instant offset;
-
-
-    /**
-     * Aligns timestamps to the smallest multiple of {@code size} since the 
{@code offset} greater
-     * than the timestamp.
-     */
-    public AlignFn(Duration size, Instant offset) {
-      this.size = size;
-      this.offset = offset;
-    }
-
-    @Override
-    public Instant apply(Instant point) {
-      long millisSinceStart = new Duration(offset, point).getMillis() % 
size.getMillis();
-      return millisSinceStart == 0 ? point : 
point.plus(size).minus(millisSinceStart);
-    }
-
-    @Override
-    public boolean equals(Object object) {
-      if (object == this) {
-        return true;
-      }
-
-      if (!(object instanceof AlignFn)) {
-        return false;
-      }
-
-      AlignFn other = (AlignFn) object;
-      return other.size.equals(this.size)
-          && other.offset.equals(this.offset);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(size, offset);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/40c4a5cb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
index 09f288e..eda269a 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
@@ -17,40 +17,87 @@
  */
 package org.apache.beam.sdk.transforms.windowing;
 
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Locale;
 import java.util.Objects;
 import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.joda.time.format.PeriodFormat;
+import org.joda.time.format.PeriodFormatter;
 
 /**
  * {@code AfterProcessingTime} triggers fire based on the current processing 
time. They operate in
  * the real-time domain.
- *
- * <p>The time at which to fire the timer can be adjusted via the methods in
- * {@link AfterDelayFromFirstElement}, such as {@link 
AfterDelayFromFirstElement#plusDelayOf} or
- * {@link AfterDelayFromFirstElement#alignedTo}.
  */
 @Experimental(Experimental.Kind.TRIGGER)
-public class AfterProcessingTime extends AfterDelayFromFirstElement {
+public class AfterProcessingTime extends OnceTrigger {
+
+  private static final PeriodFormatter DURATION_FORMATTER = 
PeriodFormat.wordBased(Locale.ENGLISH);
+
+  private final List<TimestampTransform> timestampTransforms;
 
-  private AfterProcessingTime(List<SerializableFunction<Instant, Instant>> 
transforms) {
-    super(TimeDomain.PROCESSING_TIME, transforms);
+  public AfterProcessingTime(List<TimestampTransform> timestampTransforms) {
+    super(null);
+    this.timestampTransforms = timestampTransforms;
   }
 
   /**
-   * Creates a trigger that fires when the current processing time passes the 
processing time
-   * at which this trigger saw the first element in a pane.
+   * Creates a trigger that fires when the current processing time passes the 
processing time at
+   * which this trigger saw the first element in a pane.
    */
   public static AfterProcessingTime pastFirstElementInPane() {
-    return new AfterProcessingTime(IDENTITY);
+    return new 
AfterProcessingTime(Collections.<TimestampTransform>emptyList());
+  }
+
+  /**
+   * The transforms applied to the arrival time of an element to determine 
when this trigger allows
+   * output.
+   */
+  public List<TimestampTransform> getTimestampTransforms() {
+    return timestampTransforms;
+  }
+
+  /**
+   * Adds some delay to the original target time.
+   *
+   * @param delay the delay to add
+   * @return An updated time trigger that will wait the additional time before 
firing.
+   */
+  public AfterProcessingTime plusDelayOf(final Duration delay) {
+    return new AfterProcessingTime(
+        ImmutableList.<TimestampTransform>builder()
+            .addAll(timestampTransforms)
+            .add(TimestampTransform.delay(delay))
+            .build());
+  }
+
+  /**
+   * Aligns timestamps to the smallest multiple of {@code period} since the 
{@code offset} greater
+   * than the timestamp.
+   */
+  public AfterProcessingTime alignedTo(final Duration period, final Instant 
offset) {
+    return new AfterProcessingTime(
+        ImmutableList.<TimestampTransform>builder()
+            .addAll(timestampTransforms)
+            .add(TimestampTransform.alignTo(period, offset))
+            .build());
+  }
+
+  /**
+   * Aligns the time to be the smallest multiple of {@code period} greater 
than the epoch
+   * boundary (aka {@code new Instant(0)}).
+   */
+  public AfterProcessingTime alignedTo(final Duration period) {
+    return alignedTo(period, new Instant(0));
   }
 
   @Override
-  protected AfterProcessingTime newWith(
-      List<SerializableFunction<Instant, Instant>> transforms) {
-    return new AfterProcessingTime(transforms);
+  public boolean isCompatible(Trigger other) {
+    return this.equals(other);
   }
 
   @Override
@@ -60,17 +107,28 @@ public class AfterProcessingTime extends 
AfterDelayFromFirstElement {
 
   @Override
   protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) 
{
-    return new AfterSynchronizedProcessingTime();
+    return AfterSynchronizedProcessingTime.ofFirstElement();
   }
 
   @Override
   public String toString() {
     StringBuilder builder = new 
StringBuilder("AfterProcessingTime.pastFirstElementInPane()");
-    for (SerializableFunction<Instant, Instant> delayFn : timestampMappers) {
-      builder
-          .append(".plusDelayOf(")
-          .append(delayFn)
-          .append(")");
+    for (TimestampTransform transform : getTimestampTransforms()) {
+      if (transform instanceof TimestampTransform.Delay) {
+        TimestampTransform.Delay delay = (TimestampTransform.Delay) transform;
+        builder
+            .append(".plusDelayOf(")
+            .append(DURATION_FORMATTER.print(delay.getDelay().toPeriod()))
+            .append(")");
+      } else if (transform instanceof TimestampTransform.AlignTo) {
+        TimestampTransform.AlignTo alignTo = (TimestampTransform.AlignTo) 
transform;
+        builder
+            .append(".alignedTo(")
+            .append(DURATION_FORMATTER.print(alignTo.getPeriod().toPeriod()))
+            .append(", ")
+            .append(alignTo.getOffset())
+            .append(")");
+      }
     }
 
     return builder.toString();
@@ -84,12 +142,13 @@ public class AfterProcessingTime extends 
AfterDelayFromFirstElement {
     if (!(obj instanceof AfterProcessingTime)) {
       return false;
     }
+
     AfterProcessingTime that = (AfterProcessingTime) obj;
-    return Objects.equals(this.timestampMappers, that.timestampMappers);
+    return getTimestampTransforms().equals(that.getTimestampTransforms());
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(getClass(), this.timestampMappers);
+    return Objects.hash(getTimestampTransforms());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/40c4a5cb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
index b6258f8..6249954 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
@@ -18,21 +18,22 @@
 package org.apache.beam.sdk.transforms.windowing;
 
 import com.google.common.base.Objects;
-import java.util.Collections;
 import java.util.List;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
 import org.joda.time.Instant;
 
 /**
- * A trigger that fires after synchronized processing time has reached a shared
- * threshold between upstream workers.
+ * A trigger that fires after synchronized processing time has reached the 
processing time of the
+ * first element's arrival.
+ *
+ * <p>This is for internal, primarily as a "continuation trigger" for {@link 
AfterProcessingTime}
+ * triggers. In that use, this trigger is ready as soon as all upstream 
workers processing time
+ * clocks have caught up to the moment that input arrived.
  */
-public class AfterSynchronizedProcessingTime extends 
AfterDelayFromFirstElement {
+public class AfterSynchronizedProcessingTime extends OnceTrigger {
 
   public AfterSynchronizedProcessingTime() {
-    super(TimeDomain.SYNCHRONIZED_PROCESSING_TIME,
-        Collections.<SerializableFunction<Instant, Instant>>emptyList());
+    super(null);
   }
 
   @Override
@@ -59,12 +60,4 @@ public class AfterSynchronizedProcessingTime extends 
AfterDelayFromFirstElement
   public int hashCode() {
     return Objects.hashCode(AfterSynchronizedProcessingTime.class);
   }
-
-  @Override
-  protected AfterSynchronizedProcessingTime
-      newWith(List<SerializableFunction<Instant, Instant>> transforms) {
-    // ignore transforms
-    return this;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/40c4a5cb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java
new file mode 100644
index 0000000..b16e968
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import com.google.auto.value.AutoValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** An abstract description of a standardized transformation on timestamps. */
+public abstract class TimestampTransform {
+
+  /** Returns a transform that shifts a timestamp later by {@code delay}. */
+  public static TimestampTransform delay(Duration delay) {
+    return new AutoValue_TimestampTransform_Delay(delay);
+  }
+
+  /**
+   * Returns a transform that aligns a timestamp to the next boundary of 
{@code period}, starting
+   * from {@code offset}.
+   */
+  public static TimestampTransform alignTo(Duration period, Instant offset) {
+    return new AutoValue_TimestampTransform_AlignTo(period, offset);
+  }
+
+  /**
+   * Returns a transform that aligns a timestamp to the next boundary of 
{@code period}, starting
+   * from the start of the epoch.
+   */
+  public static TimestampTransform alignTo(Duration period) {
+    return alignTo(period, new Instant(0));
+  }
+
+  /**
+   * Represents the transform that aligns a timestamp to the next boundary of 
{@link #getPeriod()}
+   * start at {@link #getOffset()}.
+   */
+  @AutoValue
+  public abstract static class AlignTo extends TimestampTransform {
+    public abstract Duration getPeriod();
+
+    public abstract Instant getOffset();
+  }
+
+  /** Represents the transform that delays a timestamp by {@link #getDelay()}. 
*/
+  @AutoValue
+  public abstract static class Delay extends TimestampTransform {
+    public abstract Duration getDelay();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/40c4a5cb/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java
index 4984d7c..006ff00 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java
@@ -45,7 +45,7 @@ public class AfterProcessingTimeTest {
     OnceTrigger firstElementPlus1 =
         
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1));
     assertEquals(
-        new AfterSynchronizedProcessingTime(),
+        AfterSynchronizedProcessingTime.ofFirstElement(),
         firstElementPlus1.getContinuationTrigger());
   }
 

Reply via email to