Move triggers to runners-core

This commit is effectively a "git hint" that all the files
in runners-core are moved from prior files. The moved files
will be replaced.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e4398e1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e4398e1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e4398e1e

Branch: refs/heads/master
Commit: e4398e1e7b08ca5098e422eb62c2d8cb139d906b
Parents: 7322616
Author: Kenneth Knowles <k...@google.com>
Authored: Thu Jun 23 20:05:27 2016 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Thu Oct 13 14:34:34 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/core/TriggerRunner.java | 247 ---------
 .../beam/runners/core/reactors/AfterAll.java    | 122 +++++
 .../reactors/AfterDelayFromFirstElement.java    | 335 ++++++++++++
 .../beam/runners/core/reactors/AfterEach.java   | 141 +++++
 .../beam/runners/core/reactors/AfterFirst.java  | 124 +++++
 .../beam/runners/core/reactors/AfterPane.java   | 144 +++++
 .../core/reactors/AfterProcessingTime.java      | 102 ++++
 .../AfterSynchronizedProcessingTime.java        |  73 +++
 .../runners/core/reactors/AfterWatermark.java   | 355 +++++++++++++
 .../runners/core/reactors/DefaultTrigger.java   |  92 ++++
 .../core/reactors/ExecutableTrigger.java        | 159 ++++++
 .../runners/core/reactors/FinishedTriggers.java |  44 ++
 .../core/reactors/FinishedTriggersBitSet.java   |  67 +++
 .../core/reactors/FinishedTriggersSet.java      |  72 +++
 .../beam/runners/core/reactors/Never.java       |  75 +++
 .../runners/core/reactors/OrFinallyTrigger.java | 105 ++++
 .../beam/runners/core/reactors/Repeatedly.java  | 101 ++++
 .../runners/core/reactors/ReshuffleTrigger.java |  66 +++
 .../beam/runners/core/reactors/Trigger.java     | 527 +++++++++++++++++++
 .../core/reactors/TriggerContextFactory.java    | 507 ++++++++++++++++++
 .../runners/core/reactors/TriggerRunner.java    | 247 +++++++++
 .../runners/core/reactors/AfterAllTest.java     | 156 ++++++
 .../runners/core/reactors/AfterEachTest.java    | 132 +++++
 .../runners/core/reactors/AfterFirstTest.java   | 181 +++++++
 .../runners/core/reactors/AfterPaneTest.java    | 132 +++++
 .../core/reactors/AfterProcessingTimeTest.java  | 187 +++++++
 .../AfterSynchronizedProcessingTimeTest.java    | 121 +++++
 .../core/reactors/AfterWatermarkTest.java       | 380 +++++++++++++
 .../core/reactors/DefaultTriggerTest.java       | 176 +++++++
 .../core/reactors/ExecutableTriggerTest.java    | 127 +++++
 .../reactors/FinishedTriggersBitSetTest.java    |  55 ++
 .../reactors/FinishedTriggersProperties.java    | 110 ++++
 .../core/reactors/FinishedTriggersSetTest.java  |  60 +++
 .../beam/runners/core/reactors/NeverTest.java   |  56 ++
 .../core/reactors/OrFinallyTriggerTest.java     | 215 ++++++++
 .../runners/core/reactors/RepeatedlyTest.java   | 224 ++++++++
 .../core/reactors/ReshuffleTriggerTest.java     |  67 +++
 .../beam/runners/core/reactors/StubTrigger.java |  70 +++
 .../beam/runners/core/reactors/TriggerTest.java | 118 +++++
 .../runners/core/reactors/TriggerTester.java    | 410 +++++++++++++++
 .../beam/sdk/transforms/windowing/AfterAll.java | 122 -----
 .../windowing/AfterDelayFromFirstElement.java   | 335 ------------
 .../sdk/transforms/windowing/AfterEach.java     | 141 -----
 .../sdk/transforms/windowing/AfterFirst.java    | 124 -----
 .../sdk/transforms/windowing/AfterPane.java     | 144 -----
 .../windowing/AfterProcessingTime.java          | 102 ----
 .../AfterSynchronizedProcessingTime.java        |  73 ---
 .../transforms/windowing/AfterWatermark.java    | 355 -------------
 .../transforms/windowing/DefaultTrigger.java    |  92 ----
 .../beam/sdk/transforms/windowing/Never.java    |  75 ---
 .../transforms/windowing/OrFinallyTrigger.java  | 105 ----
 .../sdk/transforms/windowing/Repeatedly.java    | 101 ----
 .../beam/sdk/transforms/windowing/Trigger.java  | 527 -------------------
 .../apache/beam/sdk/util/ExecutableTrigger.java | 159 ------
 .../apache/beam/sdk/util/FinishedTriggers.java  |  44 --
 .../beam/sdk/util/FinishedTriggersBitSet.java   |  67 ---
 .../beam/sdk/util/FinishedTriggersSet.java      |  72 ---
 .../apache/beam/sdk/util/ReshuffleTrigger.java  |  66 ---
 .../beam/sdk/util/TriggerContextFactory.java    | 507 ------------------
 .../sdk/transforms/windowing/AfterAllTest.java  | 156 ------
 .../sdk/transforms/windowing/AfterEachTest.java | 132 -----
 .../transforms/windowing/AfterFirstTest.java    | 181 -------
 .../sdk/transforms/windowing/AfterPaneTest.java | 132 -----
 .../windowing/AfterProcessingTimeTest.java      | 187 -------
 .../AfterSynchronizedProcessingTimeTest.java    | 121 -----
 .../windowing/AfterWatermarkTest.java           | 380 -------------
 .../windowing/DefaultTriggerTest.java           | 176 -------
 .../sdk/transforms/windowing/NeverTest.java     |  56 --
 .../windowing/OrFinallyTriggerTest.java         | 215 --------
 .../transforms/windowing/RepeatedlyTest.java    | 224 --------
 .../sdk/transforms/windowing/StubTrigger.java   |  70 ---
 .../sdk/transforms/windowing/TriggerTest.java   | 118 -----
 .../beam/sdk/util/ExecutableTriggerTest.java    | 127 -----
 .../sdk/util/FinishedTriggersBitSetTest.java    |  55 --
 .../sdk/util/FinishedTriggersProperties.java    | 110 ----
 .../beam/sdk/util/FinishedTriggersSetTest.java  |  60 ---
 .../beam/sdk/util/ReshuffleTriggerTest.java     |  67 ---
 .../org/apache/beam/sdk/util/TriggerTester.java | 410 ---------------
 78 files changed, 6435 insertions(+), 6435 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java
deleted file mode 100644
index 8d0f322..0000000
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Map;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.util.BitSetCoder;
-import org.apache.beam.sdk.util.ExecutableTrigger;
-import org.apache.beam.sdk.util.FinishedTriggers;
-import org.apache.beam.sdk.util.FinishedTriggersBitSet;
-import org.apache.beam.sdk.util.Timers;
-import org.apache.beam.sdk.util.TriggerContextFactory;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.joda.time.Instant;
-
-/**
- * Executes a trigger while managing persistence of information about which 
subtriggers are
- * finished. Subtriggers include all recursive trigger expressions as well as 
the entire trigger.
- *
- * <p>Specifically, the responsibilities are:
- *
- * <ul>
- *   <li>Invoking the trigger's methods via its {@link ExecutableTrigger} 
wrapper by
- *       constructing the appropriate trigger contexts.</li>
- *   <li>Committing a record of which subtriggers are finished to persistent 
state.</li>
- *   <li>Restoring the record of which subtriggers are finished from 
persistent state.</li>
- *   <li>Clearing out the persisted finished set when a caller indicates
- *       (via {#link #clearFinished}) that it is no longer needed.</li>
- * </ul>
- *
- * <p>These responsibilities are intertwined: trigger contexts include mutable 
information about
- * which subtriggers are finished. This class provides the information when 
building the contexts
- * and commits the information when the method of the {@link 
ExecutableTrigger} returns.
- *
- * @param <W> The kind of windows being processed.
- */
-public class TriggerRunner<W extends BoundedWindow> {
-  @VisibleForTesting
-  static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
-      StateTags.makeSystemTagInternal(StateTags.value("closed", 
BitSetCoder.of()));
-
-  private final ExecutableTrigger rootTrigger;
-  private final TriggerContextFactory<W> contextFactory;
-
-  public TriggerRunner(ExecutableTrigger rootTrigger, TriggerContextFactory<W> 
contextFactory) {
-    checkState(rootTrigger.getTriggerIndex() == 0);
-    this.rootTrigger = rootTrigger;
-    this.contextFactory = contextFactory;
-  }
-
-  private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) {
-    if (!isFinishedSetNeeded()) {
-      // If no trigger in the tree will ever have finished bits, then we don't 
need to read them.
-      // So that the code can be agnostic to that fact, we create a BitSet 
that is all 0 (not
-      // finished) for each trigger in the tree.
-      return 
FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree());
-    }
-
-    BitSet bitSet = state.read();
-    return bitSet == null
-        ? 
FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree())
-            : FinishedTriggersBitSet.fromBitSet(bitSet);
-  }
-
-
-  private void clearFinishedBits(ValueState<BitSet> state) {
-    if (!isFinishedSetNeeded()) {
-      // Nothing to clear.
-      return;
-    }
-    state.clear();
-  }
-
-  /** Return true if the trigger is closed in the window corresponding to the 
specified state. */
-  public boolean isClosed(StateAccessor<?> state) {
-    return 
readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
-  }
-
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
-      justification = "prefetch side effect")
-  public void prefetchForValue(W window, StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).readLater();
-    }
-    rootTrigger.getSpec().prefetchOnElement(
-        contextFactory.createStateAccessor(window, rootTrigger));
-  }
-
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
-      justification = "prefetch side effect")
-  public void prefetchOnFire(W window, StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).readLater();
-    }
-    
rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, 
rootTrigger));
-  }
-
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
-      justification = "prefetch side effect")
-  public void prefetchShouldFire(W window, StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).readLater();
-    }
-    rootTrigger.getSpec().prefetchShouldFire(
-        contextFactory.createStateAccessor(window, rootTrigger));
-  }
-
-  /**
-   * Run the trigger logic to deal with a new value.
-   */
-  public void processValue(W window, Instant timestamp, Timers timers, 
StateAccessor<?> state)
-      throws Exception {
-    // Clone so that we can detect changes and so that changes here don't 
pollute merging.
-    FinishedTriggersBitSet finishedSet =
-        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-    Trigger.OnElementContext triggerContext = 
contextFactory.createOnElementContext(
-        window, timers, timestamp, rootTrigger, finishedSet);
-    rootTrigger.invokeOnElement(triggerContext);
-    persistFinishedSet(state, finishedSet);
-  }
-
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
-      justification = "prefetch side effect")
-  public void prefetchForMerge(
-      W window, Collection<W> mergingWindows, MergingStateAccessor<?, W> 
state) {
-    if (isFinishedSetNeeded()) {
-      for (ValueState<?> value : 
state.accessInEachMergingWindow(FINISHED_BITS_TAG).values()) {
-        value.readLater();
-      }
-    }
-    
rootTrigger.getSpec().prefetchOnMerge(contextFactory.createMergingStateAccessor(
-        window, mergingWindows, rootTrigger));
-  }
-
-  /**
-   * Run the trigger merging logic as part of executing the specified merge.
-   */
-  public void onMerge(W window, Timers timers, MergingStateAccessor<?, W> 
state) throws Exception {
-    // Clone so that we can detect changes and so that changes here don't 
pollute merging.
-    FinishedTriggersBitSet finishedSet =
-        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-
-    // And read the finished bits in each merging window.
-    ImmutableMap.Builder<W, FinishedTriggers> builder = ImmutableMap.builder();
-    for (Map.Entry<W, ValueState<BitSet>> entry :
-        state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) {
-      // Don't need to clone these, since the trigger context doesn't allow 
modification
-      builder.put(entry.getKey(), readFinishedBits(entry.getValue()));
-      // Clear the underlying finished bits.
-      clearFinishedBits(entry.getValue());
-    }
-    ImmutableMap<W, FinishedTriggers> mergingFinishedSets = builder.build();
-
-    Trigger.OnMergeContext mergeContext = contextFactory.createOnMergeContext(
-        window, timers, rootTrigger, finishedSet, mergingFinishedSets);
-
-    // Run the merge from the trigger
-    rootTrigger.invokeOnMerge(mergeContext);
-
-    persistFinishedSet(state, finishedSet);
-  }
-
-  public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) 
throws Exception {
-    FinishedTriggers finishedSet = 
readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-    Trigger.TriggerContext context = contextFactory.base(window, timers,
-        rootTrigger, finishedSet);
-    return rootTrigger.invokeShouldFire(context);
-  }
-
-  public void onFire(W window, Timers timers, StateAccessor<?> state) throws 
Exception {
-    // shouldFire should be false.
-    // However it is too expensive to assert.
-    FinishedTriggersBitSet finishedSet =
-        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-    Trigger.TriggerContext context = contextFactory.base(window, timers,
-        rootTrigger, finishedSet);
-    rootTrigger.invokeOnFire(context);
-    persistFinishedSet(state, finishedSet);
-  }
-
-  private void persistFinishedSet(
-      StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) {
-    if (!isFinishedSetNeeded()) {
-      return;
-    }
-
-    ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG);
-    if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
-      if (modifiedFinishedSet.getBitSet().isEmpty()) {
-        finishedSetState.clear();
-      } else {
-        finishedSetState.write(modifiedFinishedSet.getBitSet());
-      }
-    }
-  }
-
-  /**
-   * Clear the finished bits.
-   */
-  public void clearFinished(StateAccessor<?> state) {
-    clearFinishedBits(state.access(FINISHED_BITS_TAG));
-  }
-
-  /**
-   * Clear the state used for executing triggers, but leave the finished set 
to indicate
-   * the window is closed.
-   */
-  public void clearState(W window, Timers timers, StateAccessor<?> state) 
throws Exception {
-    // Don't need to clone, because we'll be clearing the finished bits 
anyways.
-    FinishedTriggers finishedSet = 
readFinishedBits(state.access(FINISHED_BITS_TAG));
-    rootTrigger.invokeClear(contextFactory.base(window, timers, rootTrigger, 
finishedSet));
-  }
-
-  private boolean isFinishedSetNeeded() {
-    // TODO: If we know that no trigger in the tree will ever finish, we don't 
need to do the
-    // lookup. Right now, we special case this for the DefaultTrigger.
-    return !(rootTrigger.getSpec() instanceof DefaultTrigger);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterAll.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterAll.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterAll.java
new file mode 100644
index 0000000..cc8c97f
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterAll.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Joiner;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.ExecutableTrigger;
+import org.joda.time.Instant;
+
+/**
+ * Create a {@link Trigger} that fires and finishes once after all of its 
sub-triggers have fired.
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public class AfterAll extends OnceTrigger {
+
+  private AfterAll(List<Trigger> subTriggers) {
+    super(subTriggers);
+    checkArgument(subTriggers.size() > 1);
+  }
+
+  /**
+   * Returns an {@code AfterAll} {@code Trigger} with the given subtriggers.
+   */
+  public static OnceTrigger of(OnceTrigger... triggers) {
+    return new AfterAll(Arrays.<Trigger>asList(triggers));
+  }
+
+  @Override
+  public void onElement(OnElementContext c) throws Exception {
+    for (ExecutableTrigger subTrigger : c.trigger().unfinishedSubTriggers()) {
+      // Since subTriggers are all OnceTriggers, they must either CONTINUE or 
FIRE_AND_FINISH.
+      // invokeElement will automatically mark the finish bit if they return 
FIRE_AND_FINISH.
+      subTrigger.invokeOnElement(c);
+    }
+  }
+
+  @Override
+  public void onMerge(OnMergeContext c) throws Exception {
+    for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
+      subTrigger.invokeOnMerge(c);
+    }
+    boolean allFinished = true;
+    for (ExecutableTrigger subTrigger1 : c.trigger().subTriggers()) {
+      allFinished &= c.forTrigger(subTrigger1).trigger().isFinished();
+    }
+    c.trigger().setFinished(allFinished);
+  }
+
+  @Override
+  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+    // This trigger will fire after the latest of its sub-triggers.
+    Instant deadline = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    for (Trigger subTrigger : subTriggers) {
+      Instant subDeadline = 
subTrigger.getWatermarkThatGuaranteesFiring(window);
+      if (deadline.isBefore(subDeadline)) {
+        deadline = subDeadline;
+      }
+    }
+    return deadline;
+  }
+
+  @Override
+  public OnceTrigger getContinuationTrigger(List<Trigger> 
continuationTriggers) {
+    return new AfterAll(continuationTriggers);
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return {@code true} if all subtriggers return {@code true}.
+   */
+  @Override
+  public boolean shouldFire(TriggerContext context) throws Exception {
+    for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
+      if (!context.forTrigger(subtrigger).trigger().isFinished()
+          && !subtrigger.invokeShouldFire(context)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Invokes {@link #onFire} for all subtriggers, eliding redundant calls to 
{@link #shouldFire}
+   * because they all must be ready to fire.
+   */
+  @Override
+  public void onOnlyFiring(TriggerContext context) throws Exception {
+    for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
+      subtrigger.invokeOnFire(context);
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder("AfterAll.of(");
+    Joiner.on(", ").appendTo(builder, subTriggers);
+    builder.append(")");
+
+    return builder.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterDelayFromFirstElement.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterDelayFromFirstElement.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterDelayFromFirstElement.java
new file mode 100644
index 0000000..c4bc946
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterDelayFromFirstElement.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import com.google.common.collect.ImmutableList;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateMerging;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.PeriodFormat;
+import org.joda.time.format.PeriodFormatter;
+
+/**
+ * A base class for triggers that happen after a processing time delay from 
the arrival
+ * of the first element in a pane.
+ *
+ * <p>This class is for internal use only and may change at any time.
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public abstract class AfterDelayFromFirstElement extends OnceTrigger {
+
+  protected static final List<SerializableFunction<Instant, Instant>> IDENTITY 
=
+      ImmutableList.<SerializableFunction<Instant, Instant>>of();
+
+  protected static final StateTag<Object, AccumulatorCombiningState<Instant,
+                                              Combine.Holder<Instant>, 
Instant>> DELAYED_UNTIL_TAG =
+      
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
+          "delayed", InstantCoder.of(), Min.MinFn.<Instant>naturalOrder()));
+
+  private static final PeriodFormatter PERIOD_FORMATTER = 
PeriodFormat.wordBased(Locale.ENGLISH);
+
+  /**
+   * To complete an implementation, return the desired time from the 
TriggerContext.
+   */
+  @Nullable
+  public abstract Instant getCurrentTime(Trigger.TriggerContext context);
+
+  /**
+   * To complete an implementation, return a new instance like this one, but 
incorporating
+   * the provided timestamp mapping functions. Generally should be used by 
calling the
+   * constructor of this class from the constructor of the subclass.
+   */
+  protected abstract AfterDelayFromFirstElement newWith(
+      List<SerializableFunction<Instant, Instant>> transform);
+
+  /**
+   * A list of timestampMappers m1, m2, m3, ... m_n considered to be composed 
in sequence. The
+   * overall mapping for an instance `instance` is `m_n(... 
m3(m2(m1(instant))`,
+   * implemented via #computeTargetTimestamp
+   */
+  protected final List<SerializableFunction<Instant, Instant>> 
timestampMappers;
+
+  private final TimeDomain timeDomain;
+
+  public AfterDelayFromFirstElement(
+      TimeDomain timeDomain,
+      List<SerializableFunction<Instant, Instant>> timestampMappers) {
+    super(null);
+    this.timestampMappers = timestampMappers;
+    this.timeDomain = timeDomain;
+  }
+
+  private Instant getTargetTimestamp(OnElementContext c) {
+    return computeTargetTimestamp(c.currentProcessingTime());
+  }
+
+  /**
+   * Aligns timestamps to the smallest multiple of {@code size} since the 
{@code offset} greater
+   * than the timestamp.
+   *
+   * <p>TODO: Consider sharing this with FixedWindows, and bring over the 
equivalent of
+   * CalendarWindows.
+   */
+  public AfterDelayFromFirstElement alignedTo(final Duration size, final 
Instant offset) {
+    return newWith(new AlignFn(size, offset));
+  }
+
+  /**
+   * Aligns the time to be the smallest multiple of {@code size} greater than 
the timestamp
+   * since the epoch.
+   */
+  public AfterDelayFromFirstElement alignedTo(final Duration size) {
+    return alignedTo(size, new Instant(0));
+  }
+
+  /**
+   * Adds some delay to the original target time.
+   *
+   * @param delay the delay to add
+   * @return An updated time trigger that will wait the additional time before 
firing.
+   */
+  public AfterDelayFromFirstElement plusDelayOf(final Duration delay) {
+    return newWith(new DelayFn(delay));
+  }
+
+  /**
+   * @deprecated This will be removed in the next major version. Please use 
only
+   *             {@link #plusDelayOf} and {@link #alignedTo}.
+   */
+  @Deprecated
+  public OnceTrigger mappedTo(SerializableFunction<Instant, Instant> 
timestampMapper) {
+    return newWith(timestampMapper);
+  }
+
+  @Override
+  public boolean isCompatible(Trigger other) {
+    if (!getClass().equals(other.getClass())) {
+      return false;
+    }
+
+    AfterDelayFromFirstElement that = (AfterDelayFromFirstElement) other;
+    return this.timestampMappers.equals(that.timestampMappers);
+  }
+
+
+  private AfterDelayFromFirstElement newWith(
+      SerializableFunction<Instant, Instant> timestampMapper) {
+    return newWith(
+        ImmutableList.<SerializableFunction<Instant, Instant>>builder()
+            .addAll(timestampMappers)
+            .add(timestampMapper)
+            .build());
+  }
+
+  @Override
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", 
justification =
+      "prefetch side effect")
+  public void prefetchOnElement(StateAccessor<?> state) {
+    state.access(DELAYED_UNTIL_TAG).readLater();
+  }
+
+  @Override
+  public void onElement(OnElementContext c) throws Exception {
+    CombiningState<Instant, Instant> delayUntilState = 
c.state().access(DELAYED_UNTIL_TAG);
+    Instant oldDelayUntil = delayUntilState.read();
+
+    // Since processing time can only advance, resulting in target wake-up 
times we would
+    // ignore anyhow, we don't bother with it if it is already set.
+    if (oldDelayUntil != null) {
+      return;
+    }
+
+    Instant targetTimestamp = getTargetTimestamp(c);
+    delayUntilState.add(targetTimestamp);
+    c.setTimer(targetTimestamp, timeDomain);
+  }
+
+  @Override
+  public void prefetchOnMerge(MergingStateAccessor<?, ?> state) {
+    super.prefetchOnMerge(state);
+    StateMerging.prefetchCombiningValues(state, DELAYED_UNTIL_TAG);
+  }
+
+  @Override
+  public void onMerge(OnMergeContext c) throws Exception {
+    // NOTE: We could try to delete all timers which are still active, but we 
would
+    // need access to a timer context for each merging window.
+    // for (CombiningValueStateInternal<Instant, Combine.Holder<Instant>, 
Instant> state :
+    //    c.state().accessInEachMergingWindow(DELAYED_UNTIL_TAG).values()) {
+    //   Instant timestamp = state.get().read();
+    //   if (timestamp != null) {
+    //     <context for merging window>.deleteTimer(timestamp, timeDomain);
+    //   }
+    // }
+    // Instead let them fire and be ignored.
+
+    // If the trigger is already finished, there is no way it will become 
re-activated
+    if (c.trigger().isFinished()) {
+      StateMerging.clear(c.state(), DELAYED_UNTIL_TAG);
+      // NOTE: We do not attempt to delete  the timers.
+      return;
+    }
+
+    // Determine the earliest point across all the windows, and delay to that.
+    StateMerging.mergeCombiningValues(c.state(), DELAYED_UNTIL_TAG);
+
+    Instant earliestTargetTime = c.state().access(DELAYED_UNTIL_TAG).read();
+    if (earliestTargetTime != null) {
+      c.setTimer(earliestTargetTime, timeDomain);
+    }
+  }
+
+  @Override
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", 
justification =
+      "prefetch side effect")
+  public void prefetchShouldFire(StateAccessor<?> state) {
+    state.access(DELAYED_UNTIL_TAG).readLater();
+  }
+
+  @Override
+  public void clear(TriggerContext c) throws Exception {
+    c.state().access(DELAYED_UNTIL_TAG).clear();
+  }
+
+  @Override
+  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+    return BoundedWindow.TIMESTAMP_MAX_VALUE;
+  }
+
+  @Override
+  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
+    Instant delayedUntil = context.state().access(DELAYED_UNTIL_TAG).read();
+    return delayedUntil != null
+        && getCurrentTime(context) != null
+        && getCurrentTime(context).isAfter(delayedUntil);
+  }
+
+  @Override
+  protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception 
{
+    clear(context);
+  }
+
+  protected Instant computeTargetTimestamp(Instant time) {
+    Instant result = time;
+    for (SerializableFunction<Instant, Instant> timestampMapper : 
timestampMappers) {
+      result = timestampMapper.apply(result);
+    }
+    return result;
+  }
+
+  /**
+   * A {@link SerializableFunction} to delay the timestamp at which this 
triggers fires.
+   */
+  private static final class DelayFn implements SerializableFunction<Instant, 
Instant> {
+    private final Duration delay;
+
+    public DelayFn(Duration delay) {
+      this.delay = delay;
+    }
+
+    @Override
+    public Instant apply(Instant input) {
+      return input.plus(delay);
+    }
+
+    @Override
+    public boolean equals(Object object) {
+      if (object == this) {
+        return true;
+      }
+
+      if (!(object instanceof DelayFn)) {
+        return false;
+      }
+
+      return this.delay.equals(((DelayFn) object).delay);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(delay);
+    }
+
+    @Override
+    public String toString() {
+      return PERIOD_FORMATTER.print(delay.toPeriod());
+    }
+  }
+
+  /**
+   * A {@link SerializableFunction} to align an instant to the nearest 
interval boundary.
+   */
+  static final class AlignFn implements SerializableFunction<Instant, Instant> 
{
+    private final Duration size;
+    private final Instant offset;
+
+
+    /**
+     * Aligns timestamps to the smallest multiple of {@code size} since the 
{@code offset} greater
+     * than the timestamp.
+     */
+    public AlignFn(Duration size, Instant offset) {
+      this.size = size;
+      this.offset = offset;
+    }
+
+    @Override
+    public Instant apply(Instant point) {
+      long millisSinceStart = new Duration(offset, point).getMillis() % 
size.getMillis();
+      return millisSinceStart == 0 ? point : 
point.plus(size).minus(millisSinceStart);
+    }
+
+    @Override
+    public boolean equals(Object object) {
+      if (object == this) {
+        return true;
+      }
+
+      if (!(object instanceof AlignFn)) {
+        return false;
+      }
+
+      AlignFn other = (AlignFn) object;
+      return other.size.equals(this.size)
+          && other.offset.equals(this.offset);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(size, offset);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterEach.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterEach.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterEach.java
new file mode 100644
index 0000000..629c640
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterEach.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Joiner;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.util.ExecutableTrigger;
+import org.joda.time.Instant;
+
+/**
+ * A composite {@link Trigger} that executes its sub-triggers in order.
+ * Only one sub-trigger is executing at a time,
+ * and any time it fires the {@code AfterEach} fires. When the currently 
executing
+ * sub-trigger finishes, the {@code AfterEach} starts executing the next 
sub-trigger.
+ *
+ * <p>{@code AfterEach.inOrder(t1, t2, ...)} finishes when all of the 
sub-triggers have finished.
+ *
+ * <p>The following properties hold:
+ * <ul>
+ *   <li> {@code AfterEach.inOrder(AfterEach.inOrder(a, b), c)} behaves the 
same as
+ *   {@code AfterEach.inOrder(a, b, c)} and {@code AfterEach.inOrder(a, 
AfterEach.inOrder(b, c)}.
+ *   <li> {@code AfterEach.inOrder(Repeatedly.forever(a), b)} behaves the same 
as
+ *   {@code Repeatedly.forever(a)}, since the repeated trigger never finishes.
+ * </ul>
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public class AfterEach extends Trigger {
+
+  private AfterEach(List<Trigger> subTriggers) {
+    super(subTriggers);
+    checkArgument(subTriggers.size() > 1);
+  }
+
+  /**
+   * Returns an {@code AfterEach} {@code Trigger} with the given subtriggers.
+   */
+  @SafeVarargs
+  public static Trigger inOrder(Trigger... triggers) {
+    return new AfterEach(Arrays.<Trigger>asList(triggers));
+  }
+
+  @Override
+  public void onElement(OnElementContext c) throws Exception {
+    if (!c.trigger().isMerging()) {
+      // If merges are not possible, we need only run the first unfinished 
subtrigger
+      c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c);
+    } else {
+      // If merges are possible, we need to run all subtriggers in parallel
+      for (ExecutableTrigger subTrigger :  c.trigger().subTriggers()) {
+        // Even if the subTrigger is done, it may be revived via merging and 
must have
+        // adequate state.
+        subTrigger.invokeOnElement(c);
+      }
+    }
+  }
+
+  @Override
+  public void onMerge(OnMergeContext context) throws Exception {
+    // If merging makes a subtrigger no-longer-finished, it will automatically
+    // begin participating in shouldFire and onFire appropriately.
+
+    // All the following triggers are retroactively "not started" but that is
+    // also automatic because they are cleared whenever this trigger
+    // fires.
+    boolean priorTriggersAllFinished = true;
+    for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) {
+      if (priorTriggersAllFinished) {
+        subTrigger.invokeOnMerge(context);
+        priorTriggersAllFinished &= 
context.forTrigger(subTrigger).trigger().isFinished();
+      } else {
+        subTrigger.invokeClear(context);
+      }
+    }
+    updateFinishedState(context);
+  }
+
+  @Override
+  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+    // This trigger will fire at least once when the first trigger in the 
sequence
+    // fires at least once.
+    return subTriggers.get(0).getWatermarkThatGuaranteesFiring(window);
+  }
+
+  @Override
+  public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+    return Repeatedly.forever(new AfterFirst(continuationTriggers));
+  }
+
+  @Override
+  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
+    ExecutableTrigger firstUnfinished = 
context.trigger().firstUnfinishedSubTrigger();
+    return firstUnfinished.invokeShouldFire(context);
+  }
+
+  @Override
+  public void onFire(Trigger.TriggerContext context) throws Exception {
+    context.trigger().firstUnfinishedSubTrigger().invokeOnFire(context);
+
+    // Reset all subtriggers if in a merging context; any may be revived by 
merging so they are
+    // all run in parallel for each pending pane.
+    if (context.trigger().isMerging()) {
+      for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) {
+        subTrigger.invokeClear(context);
+      }
+    }
+
+    updateFinishedState(context);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder("AfterEach.inOrder(");
+    Joiner.on(", ").appendTo(builder, subTriggers);
+    builder.append(")");
+
+    return builder.toString();
+  }
+
+  private void updateFinishedState(TriggerContext context) {
+    
context.trigger().setFinished(context.trigger().firstUnfinishedSubTrigger() == 
null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterFirst.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterFirst.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterFirst.java
new file mode 100644
index 0000000..6b06cfa
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterFirst.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Joiner;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.ExecutableTrigger;
+import org.joda.time.Instant;
+
+/**
+ * Create a composite {@link Trigger} that fires once after at least one of 
its sub-triggers have
+ * fired.
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public class AfterFirst extends OnceTrigger {
+
+  AfterFirst(List<Trigger> subTriggers) {
+    super(subTriggers);
+    checkArgument(subTriggers.size() > 1);
+  }
+
+  /**
+   * Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers.
+   */
+  public static OnceTrigger of(OnceTrigger... triggers) {
+    return new AfterFirst(Arrays.<Trigger>asList(triggers));
+  }
+
+  @Override
+  public void onElement(OnElementContext c) throws Exception {
+    for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
+      subTrigger.invokeOnElement(c);
+    }
+  }
+
+  @Override
+  public void onMerge(OnMergeContext c) throws Exception {
+    for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
+      subTrigger.invokeOnMerge(c);
+    }
+    updateFinishedStatus(c);
+  }
+
+  @Override
+  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+    // This trigger will fire after the earliest of its sub-triggers.
+    Instant deadline = BoundedWindow.TIMESTAMP_MAX_VALUE;
+    for (Trigger subTrigger : subTriggers) {
+      Instant subDeadline = 
subTrigger.getWatermarkThatGuaranteesFiring(window);
+      if (deadline.isAfter(subDeadline)) {
+        deadline = subDeadline;
+      }
+    }
+    return deadline;
+  }
+
+  @Override
+  public OnceTrigger getContinuationTrigger(List<Trigger> 
continuationTriggers) {
+    return new AfterFirst(continuationTriggers);
+  }
+
+  @Override
+  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
+    for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
+      if (context.forTrigger(subtrigger).trigger().isFinished()
+          || subtrigger.invokeShouldFire(context)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  protected void onOnlyFiring(TriggerContext context) throws Exception {
+    for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
+      TriggerContext subContext = context.forTrigger(subtrigger);
+      if (subtrigger.invokeShouldFire(subContext)) {
+        // If the trigger is ready to fire, then do whatever it needs to do.
+        subtrigger.invokeOnFire(subContext);
+      } else {
+        // If the trigger is not ready to fire, it is nonetheless true that 
whatever
+        // pending pane it was tracking is now gone.
+        subtrigger.invokeClear(subContext);
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder("AfterFirst.of(");
+    Joiner.on(", ").appendTo(builder, subTriggers);
+    builder.append(")");
+
+    return builder.toString();
+  }
+
+  private void updateFinishedStatus(TriggerContext c) {
+    boolean anyFinished = false;
+    for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
+      anyFinished |= c.forTrigger(subTrigger).trigger().isFinished();
+    }
+    c.trigger().setFinished(anyFinished);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterPane.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterPane.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterPane.java
new file mode 100644
index 0000000..8c128dd
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterPane.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateMerging;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.joda.time.Instant;
+
+/**
+ * {@link Trigger}s that fire based on properties of the elements in the 
current pane.
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public class AfterPane extends OnceTrigger {
+
+private static final StateTag<Object, AccumulatorCombiningState<Long, long[], 
Long>>
+      ELEMENTS_IN_PANE_TAG =
+      
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
+          "count", VarLongCoder.of(), new Sum.SumLongFn()));
+
+  private final int countElems;
+
+  private AfterPane(int countElems) {
+    super(null);
+    this.countElems = countElems;
+  }
+
+  /**
+   * Creates a trigger that fires when the pane contains at least {@code 
countElems} elements.
+   */
+  public static AfterPane elementCountAtLeast(int countElems) {
+    return new AfterPane(countElems);
+  }
+
+  @Override
+  public void onElement(OnElementContext c) throws Exception {
+    c.state().access(ELEMENTS_IN_PANE_TAG).add(1L);
+  }
+
+  @Override
+  public void prefetchOnMerge(MergingStateAccessor<?, ?> state) {
+    super.prefetchOnMerge(state);
+    StateMerging.prefetchCombiningValues(state, ELEMENTS_IN_PANE_TAG);
+  }
+
+  @Override
+  public void onMerge(OnMergeContext context) throws Exception {
+    // If we've already received enough elements and finished in some window,
+    // then this trigger is just finished.
+    if (context.trigger().finishedInAnyMergingWindow()) {
+      context.trigger().setFinished(true);
+      StateMerging.clear(context.state(), ELEMENTS_IN_PANE_TAG);
+      return;
+    }
+
+    // Otherwise, compute the sum of elements in all the active panes.
+    StateMerging.mergeCombiningValues(context.state(), ELEMENTS_IN_PANE_TAG);
+  }
+
+  @Override
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", 
justification =
+      "prefetch side effect")
+  public void prefetchShouldFire(StateAccessor<?> state) {
+    state.access(ELEMENTS_IN_PANE_TAG).readLater();
+  }
+
+  @Override
+  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
+    long count = context.state().access(ELEMENTS_IN_PANE_TAG).read();
+    return count >= countElems;
+  }
+
+  @Override
+  public void clear(TriggerContext c) throws Exception {
+    c.state().access(ELEMENTS_IN_PANE_TAG).clear();
+  }
+
+  @Override
+  public boolean isCompatible(Trigger other) {
+    return this.equals(other);
+  }
+
+  @Override
+  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+    return BoundedWindow.TIMESTAMP_MAX_VALUE;
+  }
+
+  @Override
+  public OnceTrigger getContinuationTrigger(List<Trigger> 
continuationTriggers) {
+    return AfterPane.elementCountAtLeast(1);
+  }
+
+  @Override
+  public String toString() {
+    return "AfterPane.elementCountAtLeast(" + countElems + ")";
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof AfterPane)) {
+      return false;
+    }
+    AfterPane that = (AfterPane) obj;
+    return this.countElems == that.countElems;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(countElems);
+  }
+
+  @Override
+  protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception 
{
+    clear(context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterProcessingTime.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterProcessingTime.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterProcessingTime.java
new file mode 100644
index 0000000..f551118
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterProcessingTime.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Instant;
+
+/**
+ * {@code AfterProcessingTime} triggers fire based on the current processing 
time. They operate in
+ * the real-time domain.
+ *
+ * <p>The time at which to fire the timer can be adjusted via the methods in
+ * {@link AfterDelayFromFirstElement}, such as {@link 
AfterDelayFromFirstElement#plusDelayOf} or
+ * {@link AfterDelayFromFirstElement#alignedTo}.
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public class AfterProcessingTime extends AfterDelayFromFirstElement {
+
+  @Override
+  @Nullable
+  public Instant getCurrentTime(Trigger.TriggerContext context) {
+    return context.currentProcessingTime();
+  }
+
+  private AfterProcessingTime(List<SerializableFunction<Instant, Instant>> 
transforms) {
+    super(TimeDomain.PROCESSING_TIME, transforms);
+  }
+
+  /**
+   * Creates a trigger that fires when the current processing time passes the 
processing time
+   * at which this trigger saw the first element in a pane.
+   */
+  public static AfterProcessingTime pastFirstElementInPane() {
+    return new AfterProcessingTime(IDENTITY);
+  }
+
+  @Override
+  protected AfterProcessingTime newWith(
+      List<SerializableFunction<Instant, Instant>> transforms) {
+    return new AfterProcessingTime(transforms);
+  }
+
+  @Override
+  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+    return BoundedWindow.TIMESTAMP_MAX_VALUE;
+  }
+
+  @Override
+  protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) 
{
+    return new AfterSynchronizedProcessingTime();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new 
StringBuilder("AfterProcessingTime.pastFirstElementInPane()");
+    for (SerializableFunction<Instant, Instant> delayFn : timestampMappers) {
+      builder
+          .append(".plusDelayOf(")
+          .append(delayFn)
+          .append(")");
+    }
+
+    return builder.toString();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof AfterProcessingTime)) {
+      return false;
+    }
+    AfterProcessingTime that = (AfterProcessingTime) obj;
+    return Objects.equals(this.timestampMappers, that.timestampMappers);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(getClass(), this.timestampMappers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTime.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTime.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTime.java
new file mode 100644
index 0000000..59ece10
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTime.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import com.google.common.base.Objects;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Instant;
+
+class AfterSynchronizedProcessingTime extends AfterDelayFromFirstElement {
+
+  @Override
+  @Nullable
+  public Instant getCurrentTime(Trigger.TriggerContext context) {
+    return context.currentSynchronizedProcessingTime();
+  }
+
+  public AfterSynchronizedProcessingTime() {
+    super(TimeDomain.SYNCHRONIZED_PROCESSING_TIME,
+        Collections.<SerializableFunction<Instant, Instant>>emptyList());
+  }
+
+  @Override
+  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+    return BoundedWindow.TIMESTAMP_MAX_VALUE;
+  }
+
+  @Override
+  protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) 
{
+    return this;
+  }
+
+  @Override
+  public String toString() {
+    return "AfterSynchronizedProcessingTime.pastFirstElementInPane()";
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return this == obj || obj instanceof AfterSynchronizedProcessingTime;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(AfterSynchronizedProcessingTime.class);
+  }
+
+  @Override
+  protected AfterSynchronizedProcessingTime
+      newWith(List<SerializableFunction<Instant, Instant>> transforms) {
+    // ignore transforms
+    return this;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterWatermark.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterWatermark.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterWatermark.java
new file mode 100644
index 0000000..e2463d8
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterWatermark.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.ExecutableTrigger;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Instant;
+
+/**
+ * {@code AfterWatermark} triggers fire based on progress of the system 
watermark. This time is a
+ * lower-bound, sometimes heuristically established, on event times that have 
been fully processed
+ * by the pipeline.
+ *
+ * <p>For sources that provide non-heuristic watermarks (e.g.
+ * {@link org.apache.beam.sdk.io.PubsubIO} when using arrival times as event 
times), the
+ * watermark is a strict guarantee that no data with an event time earlier than
+ * that watermark will ever be observed in the pipeline. In this case, it's 
safe to assume that any
+ * pane triggered by an {@code AfterWatermark} trigger with a reference point 
at or beyond the end
+ * of the window will be the last pane ever for that window.
+ *
+ * <p>For sources that provide heuristic watermarks (e.g.
+ * {@link org.apache.beam.sdk.io.PubsubIO} when using user-supplied event 
times), the
+ * watermark itself becomes an <i>estimate</i> that no data with an event time 
earlier than that
+ * watermark (i.e. "late data") will ever be observed in the pipeline. These 
heuristics can
+ * often be quite accurate, but the chance of seeing late data for any given 
window is non-zero.
+ * Thus, if absolute correctness over time is important to your use case, you 
may want to consider
+ * using a trigger that accounts for late data. The default trigger,
+ * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}, which fires
+ * once when the watermark passes the end of the window and then immediately 
therafter when any
+ * late data arrives, is one such example.
+ *
+ * <p>The watermark is the clock that defines {@link TimeDomain#EVENT_TIME}.
+ *
+ * <p>Additionaly firings before or after the watermark can be requested by 
calling
+ * {@code AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)} or
+ * {@code AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)}.
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public class AfterWatermark {
+
+  private static final String TO_STRING = "AfterWatermark.pastEndOfWindow()";
+
+  // Static factory class.
+  private AfterWatermark() {}
+
+  /**
+   * Creates a trigger that fires when the watermark passes the end of the 
window.
+   */
+  public static FromEndOfWindow pastEndOfWindow() {
+    return new FromEndOfWindow();
+  }
+
+  /**
+   * @see AfterWatermark
+   */
+  public static class AfterWatermarkEarlyAndLate extends Trigger {
+
+    private static final int EARLY_INDEX = 0;
+    private static final int LATE_INDEX = 1;
+
+    private final OnceTrigger earlyTrigger;
+    @Nullable
+    private final OnceTrigger lateTrigger;
+
+    @SuppressWarnings("unchecked")
+    public AfterWatermarkEarlyAndLate(OnceTrigger earlyTrigger, OnceTrigger 
lateTrigger) {
+      super(lateTrigger == null
+          ? ImmutableList.<Trigger>of(earlyTrigger)
+          : ImmutableList.<Trigger>of(earlyTrigger, lateTrigger));
+      this.earlyTrigger = checkNotNull(earlyTrigger, "earlyTrigger should not 
be null");
+      this.lateTrigger = lateTrigger;
+    }
+
+    public Trigger withEarlyFirings(OnceTrigger earlyTrigger) {
+      return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
+    }
+
+    public Trigger withLateFirings(OnceTrigger lateTrigger) {
+      return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
+    }
+
+    @Override
+    public void onElement(OnElementContext c) throws Exception {
+      if (!c.trigger().isMerging()) {
+        // If merges can never happen, we just run the unfinished subtrigger
+        c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c);
+      } else {
+        // If merges can happen, we run for all subtriggers because they might 
be
+        // de-activated or re-activated
+        for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
+          subTrigger.invokeOnElement(c);
+        }
+      }
+    }
+
+    @Override
+    public void onMerge(OnMergeContext c) throws Exception {
+      // NOTE that the ReduceFnRunner will delete all end-of-window timers for 
the
+      // merged-away windows.
+
+      ExecutableTrigger earlySubtrigger = c.trigger().subTrigger(EARLY_INDEX);
+      // We check the early trigger to determine if we are still processing it 
or
+      // if the end of window has transitioned us to the late trigger
+      OnMergeContext earlyContext = c.forTrigger(earlySubtrigger);
+
+      // If the early trigger is still active in any merging window then it is 
still active in
+      // the new merged window, because even if the merged window is "done" 
some pending elements
+      // haven't had a chance to fire.
+      if (!earlyContext.trigger().finishedInAllMergingWindows() || 
!endOfWindowReached(c)) {
+        earlyContext.trigger().setFinished(false);
+        if (lateTrigger != null) {
+          ExecutableTrigger lateSubtrigger = 
c.trigger().subTrigger(LATE_INDEX);
+          OnMergeContext lateContext = c.forTrigger(lateSubtrigger);
+          lateContext.trigger().setFinished(false);
+          lateSubtrigger.invokeClear(lateContext);
+        }
+      } else {
+        // Otherwise the early trigger and end-of-window bit is done for good.
+        earlyContext.trigger().setFinished(true);
+        if (lateTrigger != null) {
+          c.trigger().subTrigger(LATE_INDEX).invokeOnMerge(c);
+        }
+      }
+    }
+
+    @Override
+    public Trigger getContinuationTrigger() {
+      return new AfterWatermarkEarlyAndLate(
+          earlyTrigger.getContinuationTrigger(),
+          lateTrigger == null ? null : lateTrigger.getContinuationTrigger());
+    }
+
+    @Override
+    protected Trigger getContinuationTrigger(List<Trigger> 
continuationTriggers) {
+      throw new UnsupportedOperationException(
+          "Should not call getContinuationTrigger(List<Trigger>)");
+    }
+
+    @Override
+    public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+      // Even without an early or late trigger, we'll still produce a firing 
at the watermark.
+      return window.maxTimestamp();
+    }
+
+    private boolean endOfWindowReached(Trigger.TriggerContext context) {
+      return context.currentEventTime() != null
+          && 
context.currentEventTime().isAfter(context.window().maxTimestamp());
+    }
+
+    @Override
+    public boolean shouldFire(Trigger.TriggerContext context) throws Exception 
{
+      if (!context.trigger().isFinished(EARLY_INDEX)) {
+        // We have not yet transitioned to late firings.
+        // We should fire if either the trigger is ready or we reach the end 
of the window.
+        return 
context.trigger().subTrigger(EARLY_INDEX).invokeShouldFire(context)
+            || endOfWindowReached(context);
+      } else if (lateTrigger == null) {
+        return false;
+      } else {
+        // We are running the late trigger
+        return 
context.trigger().subTrigger(LATE_INDEX).invokeShouldFire(context);
+      }
+    }
+
+    @Override
+    public void onFire(Trigger.TriggerContext context) throws Exception {
+      if 
(!context.forTrigger(context.trigger().subTrigger(EARLY_INDEX)).trigger().isFinished())
 {
+        onNonLateFiring(context);
+      } else if (lateTrigger != null) {
+        onLateFiring(context);
+      } else {
+        // all done
+        context.trigger().setFinished(true);
+      }
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder(TO_STRING);
+
+      if (!(earlyTrigger instanceof Never.NeverTrigger)) {
+        builder
+            .append(".withEarlyFirings(")
+            .append(earlyTrigger)
+            .append(")");
+      }
+
+      if (lateTrigger != null && !(lateTrigger instanceof Never.NeverTrigger)) 
{
+        builder
+            .append(".withLateFirings(")
+            .append(lateTrigger)
+            .append(")");
+      }
+
+      return builder.toString();
+    }
+
+    private void onNonLateFiring(Trigger.TriggerContext context) throws 
Exception {
+      // We have not yet transitioned to late firings.
+      ExecutableTrigger earlySubtrigger = 
context.trigger().subTrigger(EARLY_INDEX);
+      Trigger.TriggerContext earlyContext = 
context.forTrigger(earlySubtrigger);
+
+      if (!endOfWindowReached(context)) {
+        // This is an early firing, since we have not arrived at the end of 
the window
+        // Implicitly repeats
+        earlySubtrigger.invokeOnFire(context);
+        earlySubtrigger.invokeClear(context);
+        earlyContext.trigger().setFinished(false);
+      } else {
+        // We have arrived at the end of the window; terminate the early 
trigger
+        // and clear out the late trigger's state
+        if (earlySubtrigger.invokeShouldFire(context)) {
+          earlySubtrigger.invokeOnFire(context);
+        }
+        earlyContext.trigger().setFinished(true);
+        earlySubtrigger.invokeClear(context);
+
+        if (lateTrigger == null) {
+          // Done if there is no late trigger.
+          context.trigger().setFinished(true);
+        } else {
+          // If there is a late trigger, we transition to it, and need to 
clear its state
+          // because it was run in parallel.
+          context.trigger().subTrigger(LATE_INDEX).invokeClear(context);
+        }
+      }
+
+    }
+
+    private void onLateFiring(Trigger.TriggerContext context) throws Exception 
{
+      // We are firing the late trigger, with implicit repeat
+      ExecutableTrigger lateSubtrigger = 
context.trigger().subTrigger(LATE_INDEX);
+      lateSubtrigger.invokeOnFire(context);
+      // It is a OnceTrigger, so it must have finished; unfinished it and 
clear it
+      lateSubtrigger.invokeClear(context);
+      context.forTrigger(lateSubtrigger).trigger().setFinished(false);
+    }
+  }
+
+  /**
+   * A watermark trigger targeted relative to the end of the window.
+   */
+  public static class FromEndOfWindow extends OnceTrigger {
+
+    private FromEndOfWindow() {
+      super(null);
+    }
+
+    /**
+     * Creates a new {@code Trigger} like the this, except that it fires 
repeatedly whenever
+     * the given {@code Trigger} fires before the watermark has passed the end 
of the window.
+     */
+    public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTrigger 
earlyFirings) {
+      checkNotNull(earlyFirings, "Must specify the trigger to use for early 
firings");
+      return new AfterWatermarkEarlyAndLate(earlyFirings, null);
+    }
+
+    /**
+     * Creates a new {@code Trigger} like the this, except that it fires 
repeatedly whenever
+     * the given {@code Trigger} fires after the watermark has passed the end 
of the window.
+     */
+    public AfterWatermarkEarlyAndLate withLateFirings(OnceTrigger lateFirings) 
{
+      checkNotNull(lateFirings, "Must specify the trigger to use for late 
firings");
+      return new AfterWatermarkEarlyAndLate(Never.ever(), lateFirings);
+    }
+
+    @Override
+    public void onElement(OnElementContext c) throws Exception {
+      // We're interested in knowing when the input watermark passes the end 
of the window.
+      // (It is possible this has already happened, in which case the timer 
will be fired
+      // almost immediately).
+      c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
+    }
+
+    @Override
+    public void onMerge(OnMergeContext c) throws Exception {
+      // NOTE that the ReduceFnRunner will delete all end-of-window timers for 
the
+      // merged-away windows.
+
+      if (!c.trigger().finishedInAllMergingWindows()) {
+        // If the trigger is still active in any merging window then it is 
still active in the new
+        // merged window, because even if the merged window is "done" some 
pending elements haven't
+        // had a chance to fire
+        c.trigger().setFinished(false);
+      } else if (!endOfWindowReached(c)) {
+        // If the end of the new window has not been reached, then the trigger 
is active again.
+        c.trigger().setFinished(false);
+      } else {
+        // Otherwise it is done for good
+        c.trigger().setFinished(true);
+      }
+    }
+
+    @Override
+    public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+      return window.maxTimestamp();
+    }
+
+    @Override
+    public FromEndOfWindow getContinuationTrigger(List<Trigger> 
continuationTriggers) {
+      return this;
+    }
+
+    @Override
+    public String toString() {
+      return TO_STRING;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof FromEndOfWindow;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(getClass());
+    }
+
+    @Override
+    public boolean shouldFire(Trigger.TriggerContext context) throws Exception 
{
+      return endOfWindowReached(context);
+    }
+
+    private boolean endOfWindowReached(Trigger.TriggerContext context) {
+      return context.currentEventTime() != null
+          && 
context.currentEventTime().isAfter(context.window().maxTimestamp());
+    }
+
+    @Override
+    protected void onOnlyFiring(Trigger.TriggerContext context) throws 
Exception { }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/DefaultTrigger.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/DefaultTrigger.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/DefaultTrigger.java
new file mode 100644
index 0000000..d6b72ef
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/DefaultTrigger.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Instant;
+
+/**
+ * A trigger that is equivalent to {@code 
Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
+ * See {@link Repeatedly#forever} and {@link AfterWatermark#pastEndOfWindow} 
for more details.
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public class DefaultTrigger extends Trigger{
+
+  private DefaultTrigger() {
+    super(null);
+  }
+
+  /**
+   * Returns the default trigger.
+   */
+  public static DefaultTrigger of() {
+    return new DefaultTrigger();
+  }
+
+  @Override
+  public void onElement(OnElementContext c) throws Exception {
+    // If the end of the window has already been reached, then we are already 
ready to fire
+    // and do not need to set a wake-up timer.
+    if (!endOfWindowReached(c)) {
+      c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
+    }
+  }
+
+  @Override
+  public void onMerge(OnMergeContext c) throws Exception {
+    // If the end of the window has already been reached, then we are already 
ready to fire
+    // and do not need to set a wake-up timer.
+    if (!endOfWindowReached(c)) {
+      c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
+    }
+  }
+
+  @Override
+  public void clear(TriggerContext c) throws Exception { }
+
+  @Override
+  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+    return window.maxTimestamp();
+  }
+
+  @Override
+  public boolean isCompatible(Trigger other) {
+    // Semantically, all default triggers are identical
+    return other instanceof DefaultTrigger;
+  }
+
+  @Override
+  public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+    return this;
+  }
+
+  @Override
+  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
+    return endOfWindowReached(context);
+  }
+
+  private boolean endOfWindowReached(Trigger.TriggerContext context) {
+    return context.currentEventTime() != null
+        && context.currentEventTime().isAfter(context.window().maxTimestamp());
+  }
+
+  @Override
+  public void onFire(Trigger.TriggerContext context) throws Exception { }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ExecutableTrigger.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ExecutableTrigger.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ExecutableTrigger.java
new file mode 100644
index 0000000..088c499
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ExecutableTrigger.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+
+/**
+ * A wrapper around a trigger used during execution. While an actual trigger 
may appear multiple
+ * times (both in the same trigger expression and in other trigger 
expressions), the
+ * {@code ExecutableTrigger} wrapped around them forms a tree (only one 
occurrence).
+ */
+public class ExecutableTrigger implements Serializable {
+
+  /** Store the index assigned to this trigger. */
+  private final int triggerIndex;
+  private final int firstIndexAfterSubtree;
+  private final List<ExecutableTrigger> subTriggers = new ArrayList<>();
+  private final Trigger trigger;
+
+  public static <W extends BoundedWindow> ExecutableTrigger create(Trigger 
trigger) {
+    return create(trigger, 0);
+  }
+
+  private static <W extends BoundedWindow> ExecutableTrigger create(
+      Trigger trigger, int nextUnusedIndex) {
+    if (trigger instanceof OnceTrigger) {
+      return new ExecutableOnceTrigger((OnceTrigger) trigger, nextUnusedIndex);
+    } else {
+      return new ExecutableTrigger(trigger, nextUnusedIndex);
+    }
+  }
+
+  public static <W extends BoundedWindow> ExecutableTrigger 
createForOnceTrigger(
+      OnceTrigger trigger, int nextUnusedIndex) {
+    return new ExecutableOnceTrigger(trigger, nextUnusedIndex);
+  }
+
+  private ExecutableTrigger(Trigger trigger, int nextUnusedIndex) {
+    this.trigger = checkNotNull(trigger, "trigger must not be null");
+    this.triggerIndex = nextUnusedIndex++;
+
+    if (trigger.subTriggers() != null) {
+      for (Trigger subTrigger : trigger.subTriggers()) {
+        ExecutableTrigger subExecutable = create(subTrigger, nextUnusedIndex);
+        subTriggers.add(subExecutable);
+        nextUnusedIndex = subExecutable.firstIndexAfterSubtree;
+      }
+    }
+    firstIndexAfterSubtree = nextUnusedIndex;
+  }
+
+  public List<ExecutableTrigger> subTriggers() {
+    return subTriggers;
+  }
+
+  @Override
+  public String toString() {
+    return trigger.toString();
+  }
+
+  /**
+   * Return the underlying trigger specification corresponding to this {@code 
ExecutableTrigger}.
+   */
+  public Trigger getSpec() {
+    return trigger;
+  }
+
+  public int getTriggerIndex() {
+    return triggerIndex;
+  }
+
+  public final int getFirstIndexAfterSubtree() {
+    return firstIndexAfterSubtree;
+  }
+
+  public boolean isCompatible(ExecutableTrigger other) {
+    return trigger.isCompatible(other.trigger);
+  }
+
+  public ExecutableTrigger getSubTriggerContaining(int index) {
+    checkNotNull(subTriggers);
+    checkState(index > triggerIndex && index < firstIndexAfterSubtree,
+        "Cannot find sub-trigger containing index not in this tree.");
+    ExecutableTrigger previous = null;
+    for (ExecutableTrigger subTrigger : subTriggers) {
+      if (index < subTrigger.triggerIndex) {
+        return previous;
+      }
+      previous = subTrigger;
+    }
+    return previous;
+  }
+
+  /**
+   * Invoke the {@link Trigger#onElement} method for this trigger, ensuring 
that the bits are
+   * properly updated if the trigger finishes.
+   */
+  public void invokeOnElement(Trigger.OnElementContext c) throws Exception {
+    trigger.onElement(c.forTrigger(this));
+  }
+
+  /**
+   * Invoke the {@link Trigger#onMerge} method for this trigger, ensuring that 
the bits are properly
+   * updated.
+   */
+  public void invokeOnMerge(Trigger.OnMergeContext c) throws Exception {
+    Trigger.OnMergeContext subContext = c.forTrigger(this);
+    trigger.onMerge(subContext);
+  }
+
+  public boolean invokeShouldFire(Trigger.TriggerContext c) throws Exception {
+    return trigger.shouldFire(c.forTrigger(this));
+  }
+
+  public void invokeOnFire(Trigger.TriggerContext c) throws Exception {
+    trigger.onFire(c.forTrigger(this));
+  }
+
+  /**
+   * Invoke clear for the current this trigger.
+   */
+  public void invokeClear(Trigger.TriggerContext c) throws Exception {
+    trigger.clear(c.forTrigger(this));
+  }
+
+  /**
+   * {@link ExecutableTrigger} that enforces the fact that the trigger should 
always FIRE_AND_FINISH
+   * and never just FIRE.
+   */
+  private static class ExecutableOnceTrigger extends ExecutableTrigger {
+
+    public ExecutableOnceTrigger(OnceTrigger trigger, int nextUnusedIndex) {
+      super(trigger, nextUnusedIndex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggers.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggers.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggers.java
new file mode 100644
index 0000000..6666ab9
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggers.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+/**
+ * A mutable set which tracks whether any particular {@link ExecutableTrigger} 
is
+ * finished.
+ */
+public interface FinishedTriggers {
+  /**
+   * Returns {@code true} if the trigger is finished.
+   */
+  public boolean isFinished(ExecutableTrigger trigger);
+
+  /**
+   * Sets the fact that the trigger is finished.
+   */
+  public void setFinished(ExecutableTrigger trigger, boolean value);
+
+  /**
+   * Sets the trigger and all of its subtriggers to unfinished.
+   */
+  public void clearRecursively(ExecutableTrigger trigger);
+
+  /**
+   * Create an independent copy of this mutable {@link FinishedTriggers}.
+   */
+  public FinishedTriggers copy();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSet.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSet.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSet.java
new file mode 100644
index 0000000..4cd617f
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSet.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.util.BitSet;
+
+/**
+ * A {@link FinishedTriggers} implementation based on an underlying {@link 
BitSet}.
+ */
+public class FinishedTriggersBitSet implements FinishedTriggers {
+
+  private final BitSet bitSet;
+
+  private FinishedTriggersBitSet(BitSet bitSet) {
+    this.bitSet = bitSet;
+  }
+
+  public static FinishedTriggersBitSet emptyWithCapacity(int capacity) {
+    return new FinishedTriggersBitSet(new BitSet(capacity));
+  }
+
+  public static FinishedTriggersBitSet fromBitSet(BitSet bitSet) {
+    return new FinishedTriggersBitSet(bitSet);
+  }
+
+  /**
+   * Returns the underlying {@link BitSet} for this {@link 
FinishedTriggersBitSet}.
+   */
+  public BitSet getBitSet() {
+    return bitSet;
+  }
+
+  @Override
+  public boolean isFinished(ExecutableTrigger trigger) {
+    return bitSet.get(trigger.getTriggerIndex());
+  }
+
+  @Override
+  public void setFinished(ExecutableTrigger trigger, boolean value) {
+    bitSet.set(trigger.getTriggerIndex(), value);
+  }
+
+  @Override
+  public void clearRecursively(ExecutableTrigger trigger) {
+    bitSet.clear(trigger.getTriggerIndex(), 
trigger.getFirstIndexAfterSubtree());
+  }
+
+  @Override
+  public FinishedTriggersBitSet copy() {
+    return new FinishedTriggersBitSet((BitSet) bitSet.clone());
+  }
+}

Reply via email to