http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Repeatedly.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Repeatedly.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Repeatedly.java
deleted file mode 100644
index 8858798..0000000
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Repeatedly.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.util.ExecutableTrigger;
-import org.joda.time.Instant;
-
-/**
- * Repeat a trigger, either until some condition is met or forever.
- *
- * <p>For example, to fire after the end of the window, and every time late 
data arrives:
- * <pre> {@code
- *     Repeatedly.forever(AfterWatermark.isPastEndOfWindow());
- * } </pre>
- *
- * <p>{@code Repeatedly.forever(someTrigger)} behaves like an infinite
- * {@code AfterEach.inOrder(someTrigger, someTrigger, someTrigger, ...)}.
- */
-public class Repeatedly extends Trigger {
-
-  private static final int REPEATED = 0;
-
-  /**
-   * Create a composite trigger that repeatedly executes the trigger {@code 
repeated}, firing each
-   * time it fires and ignoring any indications to finish.
-   *
-   * <p>Unless used with {@link Trigger#orFinally} the composite trigger will 
never finish.
-   *
-   * @param repeated the trigger to execute repeatedly.
-   */
-  public static Repeatedly forever(Trigger repeated) {
-    return new Repeatedly(repeated);
-  }
-
-  private Repeatedly(Trigger repeated) {
-    super(Arrays.asList(repeated));
-  }
-
-
-  @Override
-  public void onElement(OnElementContext c) throws Exception {
-    getRepeated(c).invokeOnElement(c);
-  }
-
-  @Override
-  public void onMerge(OnMergeContext c) throws Exception {
-    getRepeated(c).invokeOnMerge(c);
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-    // This trigger fires once the repeated trigger fires.
-    return subTriggers.get(REPEATED).getWatermarkThatGuaranteesFiring(window);
-  }
-
-  @Override
-  public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
-    return new Repeatedly(continuationTriggers.get(REPEATED));
-  }
-
-  @Override
-  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
-    return getRepeated(context).invokeShouldFire(context);
-  }
-
-  @Override
-  public void onFire(TriggerContext context) throws Exception {
-    getRepeated(context).invokeOnFire(context);
-
-    if (context.trigger().isFinished(REPEATED)) {
-      // Reset tree will recursively clear the finished bits, and invoke clear.
-      context.forTrigger(getRepeated(context)).trigger().resetTree();
-    }
-  }
-
-  @Override
-  public String toString() {
-    return String.format("Repeatedly.forever(%s)", subTriggers.get(REPEATED));
-  }
-
-  private ExecutableTrigger getRepeated(TriggerContext context) {
-    return context.trigger().subTrigger(REPEATED);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ReshuffleTrigger.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ReshuffleTrigger.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ReshuffleTrigger.java
deleted file mode 100644
index 9e2c27d..0000000
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ReshuffleTrigger.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.util.List;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.joda.time.Instant;
-
-/**
- * The trigger used with {@link Reshuffle} which triggers on every element
- * and never buffers state.
- *
- * @param <W> The kind of window that is being reshuffled.
- */
-public class ReshuffleTrigger<W extends BoundedWindow> extends Trigger {
-
-  public ReshuffleTrigger() {
-    super(null);
-  }
-
-  @Override
-  public void onElement(Trigger.OnElementContext c) { }
-
-  @Override
-  public void onMerge(Trigger.OnMergeContext c) { }
-
-  @Override
-  protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) 
{
-    return this;
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-    throw new UnsupportedOperationException(
-        "ReshuffleTrigger should not be used outside of Reshuffle");
-  }
-
-  @Override
-  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
-    return true;
-  }
-
-  @Override
-  public void onFire(Trigger.TriggerContext context) throws Exception { }
-
-  @Override
-  public String toString() {
-    return "ReshuffleTrigger()";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Trigger.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Trigger.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Trigger.java
deleted file mode 100644
index a960aa4..0000000
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Trigger.java
+++ /dev/null
@@ -1,527 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import com.google.common.base.Joiner;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.util.ExecutableTrigger;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.joda.time.Instant;
-
-/**
- * {@code Trigger}s control when the elements for a specific key and window 
are output. As elements
- * arrive, they are put into one or more windows by a {@link Window} transform 
and its associated
- * {@link WindowFn}, and then passed to the associated {@code Trigger} to 
determine if the
- * {@code Window}s contents should be output.
- *
- * <p>See {@link org.apache.beam.sdk.transforms.GroupByKey} and {@link Window}
- * for more information about how grouping with windows works.
- *
- * <p>The elements that are assigned to a window since the last time it was 
fired (or since the
- * window was created) are placed into the current window pane. Triggers are 
evaluated against the
- * elements as they are added. When the root trigger fires, the elements in 
the current pane will be
- * output. When the root trigger finishes (indicating it will never fire 
again), the window is
- * closed and any new elements assigned to that window are discarded.
- *
- * <p>Several predefined {@code Trigger}s are provided:
- * <ul>
- *   <li> {@link AfterWatermark} for firing when the watermark passes a 
timestamp determined from
- *   either the end of the window or the arrival of the first element in a 
pane.
- *   <li> {@link AfterProcessingTime} for firing after some amount of 
processing time has elapsed
- *   (typically since the first element in a pane).
- *   <li> {@link AfterPane} for firing off a property of the elements in the 
current pane, such as
- *   the number of elements that have been assigned to the current pane.
- * </ul>
- *
- * <p>In addition, {@code Trigger}s can be combined in a variety of ways:
- * <ul>
- *   <li> {@link Repeatedly#forever} to create a trigger that executes 
forever. Any time its
- *   argument finishes it gets reset and starts over. Can be combined with
- *   {@link Trigger#orFinally} to specify a condition that causes the 
repetition to stop.
- *   <li> {@link AfterEach#inOrder} to execute each trigger in sequence, 
firing each (and every)
- *   time that a trigger fires, and advancing to the next trigger in the 
sequence when it finishes.
- *   <li> {@link AfterFirst#of} to create a trigger that fires after at least 
one of its arguments
- *   fires. An {@link AfterFirst} trigger finishes after it fires once.
- *   <li> {@link AfterAll#of} to create a trigger that fires after all least 
one of its arguments
- *   have fired at least once. An {@link AfterAll} trigger finishes after it 
fires once.
- * </ul>
- *
- * <p>Each trigger tree is instantiated per-key and per-window. Every trigger 
in the tree is in one
- * of the following states:
- * <ul>
- *   <li> Never Existed - before the trigger has started executing, there is 
no state associated
- *   with it anywhere in the system. A trigger moves to the executing state as 
soon as it
- *   processes in the current pane.
- *   <li> Executing - while the trigger is receiving items and may fire. While 
it is in this state,
- *   it may persist book-keeping information to persisted state, set timers, 
etc.
- *   <li> Finished - after a trigger finishes, all of its book-keeping data is 
cleaned up, and the
- *   system remembers only that it is finished. Entering this state causes us 
to discard any
- *   elements in the buffer for that window, as well.
- * </ul>
- *
- * <p>Once finished, a trigger cannot return itself back to an earlier state, 
however a composite
- * trigger could reset its sub-triggers.
- *
- * <p>Triggers should not build up any state internally since they may be 
recreated
- * between invocations of the callbacks. All important values should be 
persisted using
- * state before the callback returns.
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public abstract class Trigger implements Serializable {
-
-  /**
-   * Interface for accessing information about the trigger being executed and 
other triggers in the
-   * same tree.
-   */
-  public interface TriggerInfo {
-
-    /**
-     * Returns true if the windowing strategy of the current {@code 
PCollection} is a merging
-     * WindowFn. If true, the trigger execution needs to keep enough 
information to support the
-     * possibility of {@link Trigger#onMerge} being called. If false, {@link 
Trigger#onMerge} will
-     * never be called.
-     */
-    boolean isMerging();
-
-    /**
-     * Access the executable versions of the sub-triggers of the current 
trigger.
-     */
-    Iterable<ExecutableTrigger> subTriggers();
-
-    /**
-     * Access the executable version of the specified sub-trigger.
-     */
-    ExecutableTrigger subTrigger(int subtriggerIndex);
-
-    /**
-     * Returns true if the current trigger is marked finished.
-     */
-    boolean isFinished();
-
-    /**
-     * Return true if the given subtrigger is marked finished.
-     */
-    boolean isFinished(int subtriggerIndex);
-
-    /**
-     * Returns true if all the sub-triggers of the current trigger are marked 
finished.
-     */
-    boolean areAllSubtriggersFinished();
-
-    /**
-     * Returns an iterable over the unfinished sub-triggers of the current 
trigger.
-     */
-    Iterable<ExecutableTrigger> unfinishedSubTriggers();
-
-    /**
-     * Returns the first unfinished sub-trigger.
-     */
-    ExecutableTrigger firstUnfinishedSubTrigger();
-
-    /**
-     * Clears all keyed state for triggers in the current sub-tree and unsets 
all the associated
-     * finished bits.
-     */
-    void resetTree() throws Exception;
-
-    /**
-     * Sets the finished bit for the current trigger.
-     */
-    void setFinished(boolean finished);
-
-    /**
-     * Sets the finished bit for the given sub-trigger.
-     */
-    void setFinished(boolean finished, int subTriggerIndex);
-  }
-
-  /**
-   * Interact with properties of the trigger being executed, with extensions 
to deal with the
-   * merging windows.
-   */
-  public interface MergingTriggerInfo extends TriggerInfo {
-
-    /** Return true if the trigger is finished in any window being merged. */
-    public abstract boolean finishedInAnyMergingWindow();
-
-    /** Return true if the trigger is finished in all windows being merged. */
-    public abstract boolean finishedInAllMergingWindows();
-  }
-
-  /**
-   * Information accessible to all operational hooks in this {@code Trigger}.
-   *
-   * <p>Used directly in {@link Trigger#shouldFire} and {@link Trigger#clear}, 
and
-   * extended with additional information in other methods.
-   */
-  public abstract class TriggerContext {
-
-    /** Returns the interface for accessing trigger info. */
-    public abstract TriggerInfo trigger();
-
-    /** Returns the interface for accessing persistent state. */
-    public abstract StateAccessor<?> state();
-
-    /** The window that the current context is executing in. */
-    public abstract BoundedWindow window();
-
-    /** Create a sub-context for the given sub-trigger. */
-    public abstract TriggerContext forTrigger(ExecutableTrigger trigger);
-
-    /**
-     * Removes the timer set in this trigger context for the given {@link 
Instant}
-     * and {@link TimeDomain}.
-     */
-    public abstract void deleteTimer(Instant timestamp, TimeDomain domain);
-
-    /** The current processing time. */
-    public abstract Instant currentProcessingTime();
-
-    /** The current synchronized upstream processing time or {@code null} if 
unknown. */
-    @Nullable
-    public abstract Instant currentSynchronizedProcessingTime();
-
-    /** The current event time for the input or {@code null} if unknown. */
-    @Nullable
-    public abstract Instant currentEventTime();
-  }
-
-  /**
-   * Extended {@link TriggerContext} containing information accessible to the 
{@link #onElement}
-   * operational hook.
-   */
-  public abstract class OnElementContext extends TriggerContext {
-    /** The event timestamp of the element currently being processed. */
-    public abstract Instant eventTimestamp();
-
-    /**
-     * Sets a timer to fire when the watermark or processing time is beyond 
the given timestamp.
-     * Timers are not guaranteed to fire immediately, but will be delivered at 
some time afterwards.
-     *
-     * <p>As with {@link #state}, timers are implicitly scoped to the current 
window. All
-     * timer firings for a window will be received, but the implementation 
should choose to ignore
-     * those that are not applicable.
-     *
-     * @param timestamp the time at which the trigger should be re-evaluated
-     * @param domain the domain that the {@code timestamp} applies to
-     */
-    public abstract void setTimer(Instant timestamp, TimeDomain domain);
-
-    /** Create an {@code OnElementContext} for executing the given trigger. */
-    @Override
-    public abstract OnElementContext forTrigger(ExecutableTrigger trigger);
-  }
-
-  /**
-   * Extended {@link TriggerContext} containing information accessible to the 
{@link #onMerge}
-   * operational hook.
-   */
-  public abstract class OnMergeContext extends TriggerContext {
-    /**
-     * Sets a timer to fire when the watermark or processing time is beyond 
the given timestamp.
-     * Timers are not guaranteed to fire immediately, but will be delivered at 
some time afterwards.
-     *
-     * <p>As with {@link #state}, timers are implicitly scoped to the current 
window. All
-     * timer firings for a window will be received, but the implementation 
should choose to ignore
-     * those that are not applicable.
-     *
-     * @param timestamp the time at which the trigger should be re-evaluated
-     * @param domain the domain that the {@code timestamp} applies to
-     */
-    public abstract void setTimer(Instant timestamp, TimeDomain domain);
-
-    /** Create an {@code OnMergeContext} for executing the given trigger. */
-    @Override
-    public abstract OnMergeContext forTrigger(ExecutableTrigger trigger);
-
-    @Override
-    public abstract MergingStateAccessor<?, ?> state();
-
-    @Override
-    public abstract MergingTriggerInfo trigger();
-  }
-
-  @Nullable
-  protected final List<Trigger> subTriggers;
-
-  protected Trigger(@Nullable List<Trigger> subTriggers) {
-    this.subTriggers = subTriggers;
-  }
-
-
-  /**
-   * Called every time an element is incorporated into a window.
-   */
-  public abstract void onElement(OnElementContext c) throws Exception;
-
-  /**
-   * Called immediately after windows have been merged.
-   *
-   * <p>Leaf triggers should update their state by inspecting their status and 
any state
-   * in the merging windows. Composite triggers should update their state by 
calling
-   * {@link ExecutableTrigger#invokeOnMerge} on their sub-triggers, and 
applying appropriate logic.
-   *
-   * <p>A trigger such as {@link AfterWatermark#pastEndOfWindow} may no longer 
be finished;
-   * it is the responsibility of the trigger itself to record this fact. It is 
forbidden for
-   * a trigger to become finished due to {@link #onMerge}, as it has not yet 
fired the pending
-   * elements that led to it being ready to fire.
-   *
-   * <p>The implementation does not need to clear out any state associated 
with the old windows.
-   */
-  public abstract void onMerge(OnMergeContext c) throws Exception;
-
-  /**
-   * Returns {@code true} if the current state of the trigger indicates that 
its condition
-   * is satisfied and it is ready to fire.
-   */
-  public abstract boolean shouldFire(TriggerContext context) throws Exception;
-
-  /**
-   * Adjusts the state of the trigger to be ready for the next pane. For 
example, a
-   * {@link Repeatedly} trigger will reset its inner trigger, since it has 
fired.
-   *
-   * <p>If the trigger is finished, it is the responsibility of the trigger 
itself to
-   * record that fact via the {@code context}.
-   */
-  public abstract void onFire(TriggerContext context) throws Exception;
-
-  /**
-   * Called to allow the trigger to prefetch any state it will likely need to 
read from during
-   * an {@link #onElement} call.
-   */
-  public void prefetchOnElement(StateAccessor<?> state) {
-    if (subTriggers != null) {
-      for (Trigger trigger : subTriggers) {
-        trigger.prefetchOnElement(state);
-      }
-    }
-  }
-
-  /**
-   * Called to allow the trigger to prefetch any state it will likely need to 
read from during
-   * an {@link #onMerge} call.
-   */
-  public void prefetchOnMerge(MergingStateAccessor<?, ?> state) {
-    if (subTriggers != null) {
-      for (Trigger trigger : subTriggers) {
-        trigger.prefetchOnMerge(state);
-      }
-    }
-  }
-
-  /**
-   * Called to allow the trigger to prefetch any state it will likely need to 
read from during
-   * an {@link #shouldFire} call.
-   */
-  public void prefetchShouldFire(StateAccessor<?> state) {
-    if (subTriggers != null) {
-      for (Trigger trigger : subTriggers) {
-        trigger.prefetchShouldFire(state);
-      }
-    }
-  }
-
-  /**
-   * Called to allow the trigger to prefetch any state it will likely need to 
read from during
-   * an {@link #onFire} call.
-   */
-  public void prefetchOnFire(StateAccessor<?> state) {
-    if (subTriggers != null) {
-      for (Trigger trigger : subTriggers) {
-        trigger.prefetchOnFire(state);
-      }
-    }
-  }
-
-  /**
-   * Clear any state associated with this trigger in the given window.
-   *
-   * <p>This is called after a trigger has indicated it will never fire again. 
The trigger system
-   * keeps enough information to know that the trigger is finished, so this 
trigger should clear all
-   * of its state.
-   */
-  public void clear(TriggerContext c) throws Exception {
-    if (subTriggers != null) {
-      for (ExecutableTrigger trigger : c.trigger().subTriggers()) {
-        trigger.invokeClear(c);
-      }
-    }
-  }
-
-  public Iterable<Trigger> subTriggers() {
-    return subTriggers;
-  }
-
-  /**
-   * Return a trigger to use after a {@code GroupByKey} to preserve the
-   * intention of this trigger. Specifically, triggers that are time based
-   * and intended to provide speculative results should continue providing
-   * speculative results. Triggers that fire once (or multiple times) should
-   * continue firing once (or multiple times).
-   */
-  public Trigger getContinuationTrigger() {
-    if (subTriggers == null) {
-      return getContinuationTrigger(null);
-    }
-
-    List<Trigger> subTriggerContinuations = new ArrayList<>();
-    for (Trigger subTrigger : subTriggers) {
-      subTriggerContinuations.add(subTrigger.getContinuationTrigger());
-    }
-    return getContinuationTrigger(subTriggerContinuations);
-  }
-
-  /**
-   * Return the {@link #getContinuationTrigger} of this {@code Trigger}. For 
convenience, this
-   * is provided the continuation trigger of each of the sub-triggers.
-   */
-  protected abstract Trigger getContinuationTrigger(List<Trigger> 
continuationTriggers);
-
-  /**
-   * Returns a bound in watermark time by which this trigger would have fired 
at least once
-   * for a given window had there been input data.  This is a static property 
of a trigger
-   * that does not depend on its state.
-   *
-   * <p>For triggers that do not fire based on the watermark advancing, returns
-   * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
-   *
-   * <p>This estimate is used to determine that there are no elements in a 
side-input window, which
-   * causes the default value to be used instead.
-   */
-  public abstract Instant getWatermarkThatGuaranteesFiring(BoundedWindow 
window);
-
-  /**
-   * Returns whether this performs the same triggering as the given {@code 
Trigger}.
-   */
-  public boolean isCompatible(Trigger other) {
-    if (!getClass().equals(other.getClass())) {
-      return false;
-    }
-
-    if (subTriggers == null) {
-      return other.subTriggers == null;
-    } else if (other.subTriggers == null) {
-      return false;
-    } else if (subTriggers.size() != other.subTriggers.size()) {
-      return false;
-    }
-
-    for (int i = 0; i < subTriggers.size(); i++) {
-      if (!subTriggers.get(i).isCompatible(other.subTriggers.get(i))) {
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-  @Override
-  public String toString() {
-    String simpleName = getClass().getSimpleName();
-    if (getClass().getEnclosingClass() != null) {
-      simpleName = getClass().getEnclosingClass().getSimpleName() + "." + 
simpleName;
-    }
-    if (subTriggers == null || subTriggers.size() == 0) {
-      return simpleName;
-    } else {
-      return simpleName + "(" + Joiner.on(", ").join(subTriggers) + ")";
-    }
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (!(obj instanceof Trigger)) {
-      return false;
-    }
-    Trigger that = (Trigger) obj;
-    return Objects.equals(getClass(), that.getClass())
-        && Objects.equals(subTriggers, that.subTriggers);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(getClass(), subTriggers);
-  }
-
-  /**
-   * Specify an ending condition for this trigger. If the {@code until} fires 
then the combination
-   * fires.
-   *
-   * <p>The expression {@code t1.orFinally(t2)} fires every time {@code t1} 
fires, and finishes
-   * as soon as either {@code t1} finishes or {@code t2} fires, in which case 
it fires one last time
-   * for {@code t2}. Both {@code t1} and {@code t2} are executed in parallel. 
This means that
-   * {@code t1} may have fired since {@code t2} started, so not all of the 
elements that {@code t2}
-   * has seen are necessarily in the current pane.
-   *
-   * <p>For example the final firing of the following trigger may only have 1 
element:
-   * <pre> {@code
-   * Repeatedly.forever(AfterPane.elementCountAtLeast(2))
-   *     .orFinally(AfterPane.elementCountAtLeast(5))
-   * } </pre>
-   *
-   * <p>Note that if {@code t1} is {@link OnceTrigger}, then {@code 
t1.orFinally(t2)} is the same
-   * as {@code AfterFirst.of(t1, t2)}.
-   */
-  public Trigger orFinally(OnceTrigger until) {
-    return new OrFinallyTrigger(this, until);
-  }
-
-  /**
-   * {@link Trigger}s that are guaranteed to fire at most once should extend 
from this, rather
-   * than the general {@link Trigger} class to indicate that behavior.
-   */
-  public abstract static class OnceTrigger extends Trigger {
-    protected OnceTrigger(List<Trigger> subTriggers) {
-      super(subTriggers);
-    }
-
-    @Override
-    public final OnceTrigger getContinuationTrigger() {
-      Trigger continuation = super.getContinuationTrigger();
-      if (!(continuation instanceof OnceTrigger)) {
-        throw new IllegalStateException("Continuation of a OnceTrigger must be 
a OnceTrigger");
-      }
-      return (OnceTrigger) continuation;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public final void onFire(TriggerContext context) throws Exception {
-      onOnlyFiring(context);
-      context.trigger().setFinished(true);
-    }
-
-    /**
-     * Called exactly once by {@link #onFire} when the trigger is fired. By 
default,
-     * invokes {@link #onFire} on all subtriggers for which {@link 
#shouldFire} is {@code true}.
-     */
-    protected abstract void onOnlyFiring(TriggerContext context) throws 
Exception;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerContextFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerContextFactory.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerContextFactory.java
deleted file mode 100644
index e09aac2..0000000
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerContextFactory.java
+++ /dev/null
@@ -1,507 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import java.util.Map;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Trigger.MergingTriggerInfo;
-import org.apache.beam.sdk.transforms.windowing.Trigger.TriggerInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.joda.time.Instant;
-
-/**
- * Factory for creating instances of the various {@link Trigger} contexts.
- *
- * <p>These contexts are highly interdependent and share many fields; it is 
inadvisable
- * to create them via any means other than this factory class.
- */
-public class TriggerContextFactory<W extends BoundedWindow> {
-
-  private final WindowFn<?, W> windowFn;
-  private StateInternals<?> stateInternals;
-  private final Coder<W> windowCoder;
-
-  public TriggerContextFactory(WindowFn<?, W> windowFn,
-      StateInternals<?> stateInternals, ActiveWindowSet<W> activeWindows) {
-    // Future triggers may be able to exploit the active window to state 
address window mapping.
-    this.windowFn = windowFn;
-    this.stateInternals = stateInternals;
-    this.windowCoder = windowFn.windowCoder();
-  }
-
-  public Trigger.TriggerContext base(W window, Timers timers,
-      ExecutableTrigger rootTrigger, FinishedTriggers finishedSet) {
-    return new TriggerContextImpl(window, timers, rootTrigger, finishedSet);
-  }
-
-  public Trigger.OnElementContext createOnElementContext(
-      W window, Timers timers, Instant elementTimestamp,
-      ExecutableTrigger rootTrigger, FinishedTriggers finishedSet) {
-    return new OnElementContextImpl(window, timers, rootTrigger, finishedSet, 
elementTimestamp);
-  }
-
-  public Trigger.OnMergeContext createOnMergeContext(W window, Timers timers,
-      ExecutableTrigger rootTrigger, FinishedTriggers finishedSet,
-      Map<W, FinishedTriggers> finishedSets) {
-    return new OnMergeContextImpl(window, timers, rootTrigger, finishedSet, 
finishedSets);
-  }
-
-  public StateAccessor<?> createStateAccessor(W window, ExecutableTrigger 
trigger) {
-    return new StateAccessorImpl(window, trigger);
-  }
-
-  public MergingStateAccessor<?, W> createMergingStateAccessor(
-      W mergeResult, Collection<W> mergingWindows, ExecutableTrigger trigger) {
-    return new MergingStateAccessorImpl(trigger, mergingWindows, mergeResult);
-  }
-
-  private class TriggerInfoImpl implements Trigger.TriggerInfo {
-
-    protected final ExecutableTrigger trigger;
-    protected final FinishedTriggers finishedSet;
-    private final Trigger.TriggerContext context;
-
-    public TriggerInfoImpl(ExecutableTrigger trigger, FinishedTriggers 
finishedSet,
-        Trigger.TriggerContext context) {
-      this.trigger = trigger;
-      this.finishedSet = finishedSet;
-      this.context = context;
-    }
-
-    @Override
-    public boolean isMerging() {
-      return !windowFn.isNonMerging();
-    }
-
-    @Override
-    public Iterable<ExecutableTrigger> subTriggers() {
-      return trigger.subTriggers();
-    }
-
-    @Override
-    public ExecutableTrigger subTrigger(int subtriggerIndex) {
-      return trigger.subTriggers().get(subtriggerIndex);
-    }
-
-    @Override
-    public boolean isFinished() {
-      return finishedSet.isFinished(trigger);
-    }
-
-    @Override
-    public boolean isFinished(int subtriggerIndex) {
-      return finishedSet.isFinished(subTrigger(subtriggerIndex));
-    }
-
-    @Override
-    public boolean areAllSubtriggersFinished() {
-      return Iterables.isEmpty(unfinishedSubTriggers());
-    }
-
-    @Override
-    public Iterable<ExecutableTrigger> unfinishedSubTriggers() {
-      return FluentIterable
-          .from(trigger.subTriggers())
-          .filter(new Predicate<ExecutableTrigger>() {
-            @Override
-            public boolean apply(ExecutableTrigger trigger) {
-              return !finishedSet.isFinished(trigger);
-            }
-          });
-    }
-
-    @Override
-    public ExecutableTrigger firstUnfinishedSubTrigger() {
-      for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
-        if (!finishedSet.isFinished(subTrigger)) {
-          return subTrigger;
-        }
-      }
-      return null;
-    }
-
-    @Override
-    public void resetTree() throws Exception {
-      finishedSet.clearRecursively(trigger);
-      trigger.invokeClear(context);
-    }
-
-    @Override
-    public void setFinished(boolean finished) {
-      finishedSet.setFinished(trigger, finished);
-    }
-
-    @Override
-    public void setFinished(boolean finished, int subTriggerIndex) {
-      finishedSet.setFinished(subTrigger(subTriggerIndex), finished);
-    }
-  }
-
-  private class TriggerTimers implements Timers {
-
-    private final Timers timers;
-    private final W window;
-
-    public TriggerTimers(W window, Timers timers) {
-      this.timers = timers;
-      this.window = window;
-    }
-
-    @Override
-    public void setTimer(Instant timestamp, TimeDomain timeDomain) {
-      timers.setTimer(timestamp, timeDomain);
-    }
-
-    @Override
-    public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
-      if (timeDomain == TimeDomain.EVENT_TIME
-          && timestamp.equals(window.maxTimestamp())) {
-        // Don't allow triggers to unset the at-max-timestamp timer. This is 
necessary for on-time
-        // state transitions.
-        return;
-      }
-      timers.deleteTimer(timestamp, timeDomain);
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      return timers.currentProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentSynchronizedProcessingTime() {
-      return timers.currentSynchronizedProcessingTime();
-    }
-
-    @Override
-    public Instant currentEventTime() {
-      return timers.currentEventTime();
-    }
-  }
-
-  private class MergingTriggerInfoImpl
-      extends TriggerInfoImpl implements Trigger.MergingTriggerInfo {
-
-    private final Map<W, FinishedTriggers> finishedSets;
-
-    public MergingTriggerInfoImpl(
-        ExecutableTrigger trigger,
-        FinishedTriggers finishedSet,
-        Trigger.TriggerContext context,
-        Map<W, FinishedTriggers> finishedSets) {
-      super(trigger, finishedSet, context);
-      this.finishedSets = finishedSets;
-    }
-
-    @Override
-    public boolean finishedInAnyMergingWindow() {
-      for (FinishedTriggers finishedSet : finishedSets.values()) {
-        if (finishedSet.isFinished(trigger)) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    @Override
-    public boolean finishedInAllMergingWindows() {
-      for (FinishedTriggers finishedSet : finishedSets.values()) {
-        if (!finishedSet.isFinished(trigger)) {
-          return false;
-        }
-      }
-      return true;
-    }
-  }
-
-  private class StateAccessorImpl implements StateAccessor<Object> {
-    protected final int triggerIndex;
-    protected final StateNamespace windowNamespace;
-
-    public StateAccessorImpl(
-        W window,
-        ExecutableTrigger trigger) {
-      this.triggerIndex = trigger.getTriggerIndex();
-      this.windowNamespace = namespaceFor(window);
-    }
-
-    protected StateNamespace namespaceFor(W window) {
-      return StateNamespaces.windowAndTrigger(windowCoder, window, 
triggerIndex);
-    }
-
-    @Override
-    public <StateT extends State> StateT access(StateTag<? super Object, 
StateT> address) {
-      return stateInternals.state(windowNamespace, address);
-    }
-  }
-
-  private class MergingStateAccessorImpl extends StateAccessorImpl
-  implements MergingStateAccessor<Object, W> {
-    private final Collection<W> activeToBeMerged;
-
-    public MergingStateAccessorImpl(ExecutableTrigger trigger, Collection<W> 
activeToBeMerged,
-        W mergeResult) {
-      super(mergeResult, trigger);
-      this.activeToBeMerged = activeToBeMerged;
-    }
-
-    @Override
-    public <StateT extends State> StateT access(
-        StateTag<? super Object, StateT> address) {
-      return stateInternals.state(windowNamespace, address);
-    }
-
-    @Override
-    public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
-        StateTag<? super Object, StateT> address) {
-      ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
-      for (W mergingWindow : activeToBeMerged) {
-        StateT stateForWindow = 
stateInternals.state(namespaceFor(mergingWindow), address);
-        builder.put(mergingWindow, stateForWindow);
-      }
-      return builder.build();
-    }
-  }
-
-  private class TriggerContextImpl extends Trigger.TriggerContext {
-
-    private final W window;
-    private final StateAccessorImpl state;
-    private final Timers timers;
-    private final TriggerInfoImpl triggerInfo;
-
-    private TriggerContextImpl(
-        W window,
-        Timers timers,
-        ExecutableTrigger trigger,
-        FinishedTriggers finishedSet) {
-      trigger.getSpec().super();
-      this.window = window;
-      this.state = new StateAccessorImpl(window, trigger);
-      this.timers = new TriggerTimers(window, timers);
-      this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this);
-    }
-
-    @Override
-    public Trigger.TriggerContext forTrigger(ExecutableTrigger trigger) {
-      return new TriggerContextImpl(window, timers, trigger, 
triggerInfo.finishedSet);
-    }
-
-    @Override
-    public TriggerInfo trigger() {
-      return triggerInfo;
-    }
-
-    @Override
-    public StateAccessor<?> state() {
-      return state;
-    }
-
-    @Override
-    public W window() {
-      return window;
-    }
-
-    @Override
-    public void deleteTimer(Instant timestamp, TimeDomain domain) {
-      timers.deleteTimer(timestamp, domain);
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      return timers.currentProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentSynchronizedProcessingTime() {
-      return timers.currentSynchronizedProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentEventTime() {
-      return timers.currentEventTime();
-    }
-  }
-
-  private class OnElementContextImpl extends Trigger.OnElementContext {
-
-    private final W window;
-    private final StateAccessorImpl state;
-    private final Timers timers;
-    private final TriggerInfoImpl triggerInfo;
-    private final Instant eventTimestamp;
-
-    private OnElementContextImpl(
-        W window,
-        Timers timers,
-        ExecutableTrigger trigger,
-        FinishedTriggers finishedSet,
-        Instant eventTimestamp) {
-      trigger.getSpec().super();
-      this.window = window;
-      this.state = new StateAccessorImpl(window, trigger);
-      this.timers = new TriggerTimers(window, timers);
-      this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this);
-      this.eventTimestamp = eventTimestamp;
-    }
-
-
-    @Override
-    public Instant eventTimestamp() {
-      return eventTimestamp;
-    }
-
-    @Override
-    public Trigger.OnElementContext forTrigger(ExecutableTrigger trigger) {
-      return new OnElementContextImpl(
-          window, timers, trigger, triggerInfo.finishedSet, eventTimestamp);
-    }
-
-    @Override
-    public TriggerInfo trigger() {
-      return triggerInfo;
-    }
-
-    @Override
-    public StateAccessor<?> state() {
-      return state;
-    }
-
-    @Override
-    public W window() {
-      return window;
-    }
-
-    @Override
-    public void setTimer(Instant timestamp, TimeDomain domain) {
-      timers.setTimer(timestamp, domain);
-    }
-
-
-    @Override
-    public void deleteTimer(Instant timestamp, TimeDomain domain) {
-      timers.deleteTimer(timestamp, domain);
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      return timers.currentProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentSynchronizedProcessingTime() {
-      return timers.currentSynchronizedProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentEventTime() {
-      return timers.currentEventTime();
-    }
-  }
-
-  private class OnMergeContextImpl extends Trigger.OnMergeContext {
-    private final MergingStateAccessor<?, W> state;
-    private final W window;
-    private final Collection<W> mergingWindows;
-    private final Timers timers;
-    private final MergingTriggerInfoImpl triggerInfo;
-
-    private OnMergeContextImpl(
-        W window,
-        Timers timers,
-        ExecutableTrigger trigger,
-        FinishedTriggers finishedSet,
-        Map<W, FinishedTriggers> finishedSets) {
-      trigger.getSpec().super();
-      this.mergingWindows = finishedSets.keySet();
-      this.window = window;
-      this.state = new MergingStateAccessorImpl(trigger, mergingWindows, 
window);
-      this.timers = new TriggerTimers(window, timers);
-      this.triggerInfo = new MergingTriggerInfoImpl(trigger, finishedSet, 
this, finishedSets);
-    }
-
-    @Override
-    public Trigger.OnMergeContext forTrigger(ExecutableTrigger trigger) {
-      return new OnMergeContextImpl(
-          window, timers, trigger, triggerInfo.finishedSet, 
triggerInfo.finishedSets);
-    }
-
-    @Override
-    public MergingStateAccessor<?, W> state() {
-      return state;
-    }
-
-    @Override
-    public MergingTriggerInfo trigger() {
-      return triggerInfo;
-    }
-
-    @Override
-    public W window() {
-      return window;
-    }
-
-    @Override
-    public void setTimer(Instant timestamp, TimeDomain domain) {
-      timers.setTimer(timestamp, domain);
-    }
-
-    @Override
-    public void deleteTimer(Instant timestamp, TimeDomain domain) {
-      timers.setTimer(timestamp, domain);
-
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      return timers.currentProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentSynchronizedProcessingTime() {
-      return timers.currentSynchronizedProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentEventTime() {
-      return timers.currentEventTime();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerRunner.java
deleted file mode 100644
index 8d0f322..0000000
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/TriggerRunner.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Map;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.util.BitSetCoder;
-import org.apache.beam.sdk.util.ExecutableTrigger;
-import org.apache.beam.sdk.util.FinishedTriggers;
-import org.apache.beam.sdk.util.FinishedTriggersBitSet;
-import org.apache.beam.sdk.util.Timers;
-import org.apache.beam.sdk.util.TriggerContextFactory;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.joda.time.Instant;
-
-/**
- * Executes a trigger while managing persistence of information about which 
subtriggers are
- * finished. Subtriggers include all recursive trigger expressions as well as 
the entire trigger.
- *
- * <p>Specifically, the responsibilities are:
- *
- * <ul>
- *   <li>Invoking the trigger's methods via its {@link ExecutableTrigger} 
wrapper by
- *       constructing the appropriate trigger contexts.</li>
- *   <li>Committing a record of which subtriggers are finished to persistent 
state.</li>
- *   <li>Restoring the record of which subtriggers are finished from 
persistent state.</li>
- *   <li>Clearing out the persisted finished set when a caller indicates
- *       (via {#link #clearFinished}) that it is no longer needed.</li>
- * </ul>
- *
- * <p>These responsibilities are intertwined: trigger contexts include mutable 
information about
- * which subtriggers are finished. This class provides the information when 
building the contexts
- * and commits the information when the method of the {@link 
ExecutableTrigger} returns.
- *
- * @param <W> The kind of windows being processed.
- */
-public class TriggerRunner<W extends BoundedWindow> {
-  @VisibleForTesting
-  static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
-      StateTags.makeSystemTagInternal(StateTags.value("closed", 
BitSetCoder.of()));
-
-  private final ExecutableTrigger rootTrigger;
-  private final TriggerContextFactory<W> contextFactory;
-
-  public TriggerRunner(ExecutableTrigger rootTrigger, TriggerContextFactory<W> 
contextFactory) {
-    checkState(rootTrigger.getTriggerIndex() == 0);
-    this.rootTrigger = rootTrigger;
-    this.contextFactory = contextFactory;
-  }
-
-  private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) {
-    if (!isFinishedSetNeeded()) {
-      // If no trigger in the tree will ever have finished bits, then we don't 
need to read them.
-      // So that the code can be agnostic to that fact, we create a BitSet 
that is all 0 (not
-      // finished) for each trigger in the tree.
-      return 
FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree());
-    }
-
-    BitSet bitSet = state.read();
-    return bitSet == null
-        ? 
FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree())
-            : FinishedTriggersBitSet.fromBitSet(bitSet);
-  }
-
-
-  private void clearFinishedBits(ValueState<BitSet> state) {
-    if (!isFinishedSetNeeded()) {
-      // Nothing to clear.
-      return;
-    }
-    state.clear();
-  }
-
-  /** Return true if the trigger is closed in the window corresponding to the 
specified state. */
-  public boolean isClosed(StateAccessor<?> state) {
-    return 
readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
-  }
-
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
-      justification = "prefetch side effect")
-  public void prefetchForValue(W window, StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).readLater();
-    }
-    rootTrigger.getSpec().prefetchOnElement(
-        contextFactory.createStateAccessor(window, rootTrigger));
-  }
-
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
-      justification = "prefetch side effect")
-  public void prefetchOnFire(W window, StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).readLater();
-    }
-    
rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, 
rootTrigger));
-  }
-
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
-      justification = "prefetch side effect")
-  public void prefetchShouldFire(W window, StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).readLater();
-    }
-    rootTrigger.getSpec().prefetchShouldFire(
-        contextFactory.createStateAccessor(window, rootTrigger));
-  }
-
-  /**
-   * Run the trigger logic to deal with a new value.
-   */
-  public void processValue(W window, Instant timestamp, Timers timers, 
StateAccessor<?> state)
-      throws Exception {
-    // Clone so that we can detect changes and so that changes here don't 
pollute merging.
-    FinishedTriggersBitSet finishedSet =
-        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-    Trigger.OnElementContext triggerContext = 
contextFactory.createOnElementContext(
-        window, timers, timestamp, rootTrigger, finishedSet);
-    rootTrigger.invokeOnElement(triggerContext);
-    persistFinishedSet(state, finishedSet);
-  }
-
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
-      justification = "prefetch side effect")
-  public void prefetchForMerge(
-      W window, Collection<W> mergingWindows, MergingStateAccessor<?, W> 
state) {
-    if (isFinishedSetNeeded()) {
-      for (ValueState<?> value : 
state.accessInEachMergingWindow(FINISHED_BITS_TAG).values()) {
-        value.readLater();
-      }
-    }
-    
rootTrigger.getSpec().prefetchOnMerge(contextFactory.createMergingStateAccessor(
-        window, mergingWindows, rootTrigger));
-  }
-
-  /**
-   * Run the trigger merging logic as part of executing the specified merge.
-   */
-  public void onMerge(W window, Timers timers, MergingStateAccessor<?, W> 
state) throws Exception {
-    // Clone so that we can detect changes and so that changes here don't 
pollute merging.
-    FinishedTriggersBitSet finishedSet =
-        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-
-    // And read the finished bits in each merging window.
-    ImmutableMap.Builder<W, FinishedTriggers> builder = ImmutableMap.builder();
-    for (Map.Entry<W, ValueState<BitSet>> entry :
-        state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) {
-      // Don't need to clone these, since the trigger context doesn't allow 
modification
-      builder.put(entry.getKey(), readFinishedBits(entry.getValue()));
-      // Clear the underlying finished bits.
-      clearFinishedBits(entry.getValue());
-    }
-    ImmutableMap<W, FinishedTriggers> mergingFinishedSets = builder.build();
-
-    Trigger.OnMergeContext mergeContext = contextFactory.createOnMergeContext(
-        window, timers, rootTrigger, finishedSet, mergingFinishedSets);
-
-    // Run the merge from the trigger
-    rootTrigger.invokeOnMerge(mergeContext);
-
-    persistFinishedSet(state, finishedSet);
-  }
-
-  public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) 
throws Exception {
-    FinishedTriggers finishedSet = 
readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-    Trigger.TriggerContext context = contextFactory.base(window, timers,
-        rootTrigger, finishedSet);
-    return rootTrigger.invokeShouldFire(context);
-  }
-
-  public void onFire(W window, Timers timers, StateAccessor<?> state) throws 
Exception {
-    // shouldFire should be false.
-    // However it is too expensive to assert.
-    FinishedTriggersBitSet finishedSet =
-        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-    Trigger.TriggerContext context = contextFactory.base(window, timers,
-        rootTrigger, finishedSet);
-    rootTrigger.invokeOnFire(context);
-    persistFinishedSet(state, finishedSet);
-  }
-
-  private void persistFinishedSet(
-      StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) {
-    if (!isFinishedSetNeeded()) {
-      return;
-    }
-
-    ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG);
-    if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
-      if (modifiedFinishedSet.getBitSet().isEmpty()) {
-        finishedSetState.clear();
-      } else {
-        finishedSetState.write(modifiedFinishedSet.getBitSet());
-      }
-    }
-  }
-
-  /**
-   * Clear the finished bits.
-   */
-  public void clearFinished(StateAccessor<?> state) {
-    clearFinishedBits(state.access(FINISHED_BITS_TAG));
-  }
-
-  /**
-   * Clear the state used for executing triggers, but leave the finished set 
to indicate
-   * the window is closed.
-   */
-  public void clearState(W window, Timers timers, StateAccessor<?> state) 
throws Exception {
-    // Don't need to clone, because we'll be clearing the finished bits 
anyways.
-    FinishedTriggers finishedSet = 
readFinishedBits(state.access(FINISHED_BITS_TAG));
-    rootTrigger.invokeClear(contextFactory.base(window, timers, rootTrigger, 
finishedSet));
-  }
-
-  private boolean isFinishedSetNeeded() {
-    // TODO: If we know that no trigger in the tree will ever finish, we don't 
need to do the
-    // lookup. Right now, we special case this for the DefaultTrigger.
-    return !(rootTrigger.getSpec() instanceof DefaultTrigger);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
new file mode 100644
index 0000000..2f4ad63
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Joiner;
+import java.util.Arrays;
+import java.util.List;
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
+import org.apache.beam.sdk.annotations.Experimental;
+
+/**
+ * A {@link TriggerStateMachine} that fires and finishes once after all of its 
sub-triggers
+ * have fired.
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public class AfterAllStateMachine extends OnceTriggerStateMachine {
+
+  private AfterAllStateMachine(List<TriggerStateMachine> subTriggers) {
+    super(subTriggers);
+    checkArgument(subTriggers.size() > 1);
+  }
+
+  /**
+   * Returns an {@code AfterAll} {@code Trigger} with the given subtriggers.
+   */
+  @SafeVarargs
+  public static OnceTriggerStateMachine of(OnceTriggerStateMachine... 
triggers) {
+    return new 
AfterAllStateMachine(Arrays.<TriggerStateMachine>asList(triggers));
+  }
+
+  @Override
+  public void onElement(OnElementContext c) throws Exception {
+    for (ExecutableTriggerStateMachine subTrigger : 
c.trigger().unfinishedSubTriggers()) {
+      // Since subTriggers are all OnceTriggers, they must either CONTINUE or 
FIRE_AND_FINISH.
+      // invokeElement will automatically mark the finish bit if they return 
FIRE_AND_FINISH.
+      subTrigger.invokeOnElement(c);
+    }
+  }
+
+  @Override
+  public void onMerge(OnMergeContext c) throws Exception {
+    for (ExecutableTriggerStateMachine subTrigger : c.trigger().subTriggers()) 
{
+      subTrigger.invokeOnMerge(c);
+    }
+    boolean allFinished = true;
+    for (ExecutableTriggerStateMachine subTrigger1 : 
c.trigger().subTriggers()) {
+      allFinished &= c.forTrigger(subTrigger1).trigger().isFinished();
+    }
+    c.trigger().setFinished(allFinished);
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return {@code true} if all subtriggers return {@code true}.
+   */
+  @Override
+  public boolean shouldFire(TriggerContext context) throws Exception {
+    for (ExecutableTriggerStateMachine subtrigger : 
context.trigger().subTriggers()) {
+      if (!context.forTrigger(subtrigger).trigger().isFinished()
+          && !subtrigger.invokeShouldFire(context)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Invokes {@link #onFire} for all subtriggers, eliding redundant calls to 
{@link #shouldFire}
+   * because they all must be ready to fire.
+   */
+  @Override
+  public void onOnlyFiring(TriggerContext context) throws Exception {
+    for (ExecutableTriggerStateMachine subtrigger : 
context.trigger().subTriggers()) {
+      subtrigger.invokeOnFire(context);
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder("AfterAll.of(");
+    Joiner.on(", ").appendTo(builder, subTriggers);
+    builder.append(")");
+
+    return builder.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
new file mode 100644
index 0000000..a6616fa
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import com.google.common.collect.ImmutableList;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateMerging;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.PeriodFormat;
+import org.joda.time.format.PeriodFormatter;
+
+/**
+ * A base class for triggers that happen after a processing time delay from 
the arrival
+ * of the first element in a pane.
+ *
+ * <p>This class is for internal use only and may change at any time.
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public abstract class AfterDelayFromFirstElementStateMachine extends 
OnceTriggerStateMachine {
+
+  protected static final List<SerializableFunction<Instant, Instant>> IDENTITY 
=
+      ImmutableList.<SerializableFunction<Instant, Instant>>of();
+
+  protected static final StateTag<Object, AccumulatorCombiningState<Instant,
+                                              Combine.Holder<Instant>, 
Instant>> DELAYED_UNTIL_TAG =
+      
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
+          "delayed", InstantCoder.of(), Min.MinFn.<Instant>naturalOrder()));
+
+  private static final PeriodFormatter PERIOD_FORMATTER = 
PeriodFormat.wordBased(Locale.ENGLISH);
+
+  /**
+   * To complete an implementation, return the desired time from the 
TriggerContext.
+   */
+  @Nullable
+  public abstract Instant getCurrentTime(TriggerStateMachine.TriggerContext 
context);
+
+  /**
+   * To complete an implementation, return a new instance like this one, but 
incorporating
+   * the provided timestamp mapping functions. Generally should be used by 
calling the
+   * constructor of this class from the constructor of the subclass.
+   */
+  protected abstract AfterDelayFromFirstElementStateMachine newWith(
+      List<SerializableFunction<Instant, Instant>> transform);
+
+  /**
+   * A list of timestampMappers m1, m2, m3, ... m_n considered to be composed 
in sequence. The
+   * overall mapping for an instance `instance` is `m_n(... 
m3(m2(m1(instant))`,
+   * implemented via #computeTargetTimestamp
+   */
+  protected final List<SerializableFunction<Instant, Instant>> 
timestampMappers;
+
+  private final TimeDomain timeDomain;
+
+  public AfterDelayFromFirstElementStateMachine(
+      TimeDomain timeDomain,
+      List<SerializableFunction<Instant, Instant>> timestampMappers) {
+    super(null);
+    this.timestampMappers = timestampMappers;
+    this.timeDomain = timeDomain;
+  }
+
+  private Instant getTargetTimestamp(OnElementContext c) {
+    return computeTargetTimestamp(c.currentProcessingTime());
+  }
+
+  /**
+   * Aligns timestamps to the smallest multiple of {@code size} since the 
{@code offset} greater
+   * than the timestamp.
+   *
+   * <p>TODO: Consider sharing this with FixedWindows, and bring over the 
equivalent of
+   * CalendarWindows.
+   */
+  public AfterDelayFromFirstElementStateMachine alignedTo(
+      final Duration size, final Instant offset) {
+    return newWith(new AlignFn(size, offset));
+  }
+
+  /**
+   * Aligns the time to be the smallest multiple of {@code size} greater than 
the timestamp
+   * since the epoch.
+   */
+  public AfterDelayFromFirstElementStateMachine alignedTo(final Duration size) 
{
+    return alignedTo(size, new Instant(0));
+  }
+
+  /**
+   * Adds some delay to the original target time.
+   *
+   * @param delay the delay to add
+   * @return An updated time trigger that will wait the additional time before 
firing.
+   */
+  public AfterDelayFromFirstElementStateMachine plusDelayOf(final Duration 
delay) {
+    return newWith(new DelayFn(delay));
+  }
+
+  @Override
+  public boolean isCompatible(TriggerStateMachine other) {
+    if (!getClass().equals(other.getClass())) {
+      return false;
+    }
+
+    AfterDelayFromFirstElementStateMachine that = 
(AfterDelayFromFirstElementStateMachine) other;
+    return this.timestampMappers.equals(that.timestampMappers);
+  }
+
+
+  private AfterDelayFromFirstElementStateMachine newWith(
+      SerializableFunction<Instant, Instant> timestampMapper) {
+    return newWith(
+        ImmutableList.<SerializableFunction<Instant, Instant>>builder()
+            .addAll(timestampMappers)
+            .add(timestampMapper)
+            .build());
+  }
+
+  @Override
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", 
justification =
+      "prefetch side effect")
+  public void prefetchOnElement(StateAccessor<?> state) {
+    state.access(DELAYED_UNTIL_TAG).readLater();
+  }
+
+  @Override
+  public void onElement(OnElementContext c) throws Exception {
+    CombiningState<Instant, Instant> delayUntilState = 
c.state().access(DELAYED_UNTIL_TAG);
+    Instant oldDelayUntil = delayUntilState.read();
+
+    // Since processing time can only advance, resulting in target wake-up 
times we would
+    // ignore anyhow, we don't bother with it if it is already set.
+    if (oldDelayUntil != null) {
+      return;
+    }
+
+    Instant targetTimestamp = getTargetTimestamp(c);
+    delayUntilState.add(targetTimestamp);
+    c.setTimer(targetTimestamp, timeDomain);
+  }
+
+  @Override
+  public void prefetchOnMerge(MergingStateAccessor<?, ?> state) {
+    super.prefetchOnMerge(state);
+    StateMerging.prefetchCombiningValues(state, DELAYED_UNTIL_TAG);
+  }
+
+  @Override
+  public void onMerge(OnMergeContext c) throws Exception {
+    // NOTE: We could try to delete all timers which are still active, but we 
would
+    // need access to a timer context for each merging window.
+    // for (CombiningValueStateInternal<Instant, Combine.Holder<Instant>, 
Instant> state :
+    //    c.state().accessInEachMergingWindow(DELAYED_UNTIL_TAG).values()) {
+    //   Instant timestamp = state.get().read();
+    //   if (timestamp != null) {
+    //     <context for merging window>.deleteTimer(timestamp, timeDomain);
+    //   }
+    // }
+    // Instead let them fire and be ignored.
+
+    // If the trigger is already finished, there is no way it will become 
re-activated
+    if (c.trigger().isFinished()) {
+      StateMerging.clear(c.state(), DELAYED_UNTIL_TAG);
+      // NOTE: We do not attempt to delete  the timers.
+      return;
+    }
+
+    // Determine the earliest point across all the windows, and delay to that.
+    StateMerging.mergeCombiningValues(c.state(), DELAYED_UNTIL_TAG);
+
+    Instant earliestTargetTime = c.state().access(DELAYED_UNTIL_TAG).read();
+    if (earliestTargetTime != null) {
+      c.setTimer(earliestTargetTime, timeDomain);
+    }
+  }
+
+  @Override
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", 
justification =
+      "prefetch side effect")
+  public void prefetchShouldFire(StateAccessor<?> state) {
+    state.access(DELAYED_UNTIL_TAG).readLater();
+  }
+
+  @Override
+  public void clear(TriggerContext c) throws Exception {
+    c.state().access(DELAYED_UNTIL_TAG).clear();
+  }
+
+  @Override
+  public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws 
Exception {
+    Instant delayedUntil = context.state().access(DELAYED_UNTIL_TAG).read();
+    return delayedUntil != null
+        && getCurrentTime(context) != null
+        && getCurrentTime(context).isAfter(delayedUntil);
+  }
+
+  @Override
+  protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) 
throws Exception {
+    clear(context);
+  }
+
+  protected Instant computeTargetTimestamp(Instant time) {
+    Instant result = time;
+    for (SerializableFunction<Instant, Instant> timestampMapper : 
timestampMappers) {
+      result = timestampMapper.apply(result);
+    }
+    return result;
+  }
+
+  /**
+   * A {@link SerializableFunction} to delay the timestamp at which this 
triggers fires.
+   */
+  private static final class DelayFn implements SerializableFunction<Instant, 
Instant> {
+    private final Duration delay;
+
+    public DelayFn(Duration delay) {
+      this.delay = delay;
+    }
+
+    @Override
+    public Instant apply(Instant input) {
+      return input.plus(delay);
+    }
+
+    @Override
+    public boolean equals(Object object) {
+      if (object == this) {
+        return true;
+      }
+
+      if (!(object instanceof DelayFn)) {
+        return false;
+      }
+
+      return this.delay.equals(((DelayFn) object).delay);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(delay);
+    }
+
+    @Override
+    public String toString() {
+      return PERIOD_FORMATTER.print(delay.toPeriod());
+    }
+  }
+
+  /**
+   * A {@link SerializableFunction} to align an instant to the nearest 
interval boundary.
+   */
+  static final class AlignFn implements SerializableFunction<Instant, Instant> 
{
+    private final Duration size;
+    private final Instant offset;
+
+
+    /**
+     * Aligns timestamps to the smallest multiple of {@code size} since the 
{@code offset} greater
+     * than the timestamp.
+     */
+    public AlignFn(Duration size, Instant offset) {
+      this.size = size;
+      this.offset = offset;
+    }
+
+    @Override
+    public Instant apply(Instant point) {
+      long millisSinceStart = new Duration(offset, point).getMillis() % 
size.getMillis();
+      return millisSinceStart == 0 ? point : 
point.plus(size).minus(millisSinceStart);
+    }
+
+    @Override
+    public boolean equals(Object object) {
+      if (object == this) {
+        return true;
+      }
+
+      if (!(object instanceof AlignFn)) {
+        return false;
+      }
+
+      AlignFn other = (AlignFn) object;
+      return other.size.equals(this.size)
+          && other.offset.equals(this.offset);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(size, offset);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterEachStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterEachStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterEachStateMachine.java
new file mode 100644
index 0000000..140ac75
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterEachStateMachine.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Joiner;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * A composite {@link TriggerStateMachine} that executes its sub-triggers in 
order.
+ * Only one sub-trigger is executing at a time,
+ * and any time it fires the {@code AfterEach} fires. When the currently 
executing
+ * sub-trigger finishes, the {@code AfterEach} starts executing the next 
sub-trigger.
+ *
+ * <p>{@code AfterEach.inOrder(t1, t2, ...)} finishes when all of the 
sub-triggers have finished.
+ *
+ * <p>The following properties hold:
+ * <ul>
+ *   <li> {@code AfterEach.inOrder(AfterEach.inOrder(a, b), c)} behaves the 
same as
+ *   {@code AfterEach.inOrder(a, b, c)} and {@code AfterEach.inOrder(a, 
AfterEach.inOrder(b, c)}.
+ *   <li> {@code AfterEach.inOrder(Repeatedly.forever(a), b)} behaves the same 
as
+ *   {@code Repeatedly.forever(a)}, since the repeated trigger never finishes.
+ * </ul>
+ */
+public class AfterEachStateMachine extends TriggerStateMachine {
+
+  private AfterEachStateMachine(List<TriggerStateMachine> subTriggers) {
+    super(subTriggers);
+    checkArgument(subTriggers.size() > 1);
+  }
+
+  /**
+   * Returns an {@code AfterEach} {@code Trigger} with the given subtriggers.
+   */
+  @SafeVarargs
+  public static TriggerStateMachine inOrder(TriggerStateMachine... triggers) {
+    return new 
AfterEachStateMachine(Arrays.<TriggerStateMachine>asList(triggers));
+  }
+
+  @Override
+  public void onElement(OnElementContext c) throws Exception {
+    if (!c.trigger().isMerging()) {
+      // If merges are not possible, we need only run the first unfinished 
subtrigger
+      c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c);
+    } else {
+      // If merges are possible, we need to run all subtriggers in parallel
+      for (ExecutableTriggerStateMachine subTrigger :  
c.trigger().subTriggers()) {
+        // Even if the subTrigger is done, it may be revived via merging and 
must have
+        // adequate state.
+        subTrigger.invokeOnElement(c);
+      }
+    }
+  }
+
+  @Override
+  public void onMerge(OnMergeContext context) throws Exception {
+    // If merging makes a subtrigger no-longer-finished, it will automatically
+    // begin participating in shouldFire and onFire appropriately.
+
+    // All the following triggers are retroactively "not started" but that is
+    // also automatic because they are cleared whenever this trigger
+    // fires.
+    boolean priorTriggersAllFinished = true;
+    for (ExecutableTriggerStateMachine subTrigger : 
context.trigger().subTriggers()) {
+      if (priorTriggersAllFinished) {
+        subTrigger.invokeOnMerge(context);
+        priorTriggersAllFinished &= 
context.forTrigger(subTrigger).trigger().isFinished();
+      } else {
+        subTrigger.invokeClear(context);
+      }
+    }
+    updateFinishedState(context);
+  }
+
+  @Override
+  public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws 
Exception {
+    ExecutableTriggerStateMachine firstUnfinished = 
context.trigger().firstUnfinishedSubTrigger();
+    return firstUnfinished.invokeShouldFire(context);
+  }
+
+  @Override
+  public void onFire(TriggerStateMachine.TriggerContext context) throws 
Exception {
+    context.trigger().firstUnfinishedSubTrigger().invokeOnFire(context);
+
+    // Reset all subtriggers if in a merging context; any may be revived by 
merging so they are
+    // all run in parallel for each pending pane.
+    if (context.trigger().isMerging()) {
+      for (ExecutableTriggerStateMachine subTrigger : 
context.trigger().subTriggers()) {
+        subTrigger.invokeClear(context);
+      }
+    }
+
+    updateFinishedState(context);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder("AfterEach.inOrder(");
+    Joiner.on(", ").appendTo(builder, subTriggers);
+    builder.append(")");
+
+    return builder.toString();
+  }
+
+  private void updateFinishedState(TriggerContext context) {
+    
context.trigger().setFinished(context.trigger().firstUnfinishedSubTrigger() == 
null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
new file mode 100644
index 0000000..272c278
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Joiner;
+import java.util.Arrays;
+import java.util.List;
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
+import org.apache.beam.sdk.annotations.Experimental;
+
+/**
+ * Create a composite {@link TriggerStateMachine} that fires once after at 
least one of its
+ * sub-triggers have fired.
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public class AfterFirstStateMachine extends OnceTriggerStateMachine {
+
+  AfterFirstStateMachine(List<TriggerStateMachine> subTriggers) {
+    super(subTriggers);
+    checkArgument(subTriggers.size() > 1);
+  }
+
+  /**
+   * Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers.
+   */
+  @SafeVarargs
+  public static OnceTriggerStateMachine of(
+      OnceTriggerStateMachine... triggers) {
+    return new 
AfterFirstStateMachine(Arrays.<TriggerStateMachine>asList(triggers));
+  }
+
+  @Override
+  public void onElement(OnElementContext c) throws Exception {
+    for (ExecutableTriggerStateMachine subTrigger : c.trigger().subTriggers()) 
{
+      subTrigger.invokeOnElement(c);
+    }
+  }
+
+  @Override
+  public void onMerge(OnMergeContext c) throws Exception {
+    for (ExecutableTriggerStateMachine subTrigger : c.trigger().subTriggers()) 
{
+      subTrigger.invokeOnMerge(c);
+    }
+    updateFinishedStatus(c);
+  }
+
+  @Override
+  public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws 
Exception {
+    for (ExecutableTriggerStateMachine subtrigger : 
context.trigger().subTriggers()) {
+      if (context.forTrigger(subtrigger).trigger().isFinished()
+          || subtrigger.invokeShouldFire(context)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  protected void onOnlyFiring(TriggerContext context) throws Exception {
+    for (ExecutableTriggerStateMachine subtrigger : 
context.trigger().subTriggers()) {
+      TriggerContext subContext = context.forTrigger(subtrigger);
+      if (subtrigger.invokeShouldFire(subContext)) {
+        // If the trigger is ready to fire, then do whatever it needs to do.
+        subtrigger.invokeOnFire(subContext);
+      } else {
+        // If the trigger is not ready to fire, it is nonetheless true that 
whatever
+        // pending pane it was tracking is now gone.
+        subtrigger.invokeClear(subContext);
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder("AfterFirst.of(");
+    Joiner.on(", ").appendTo(builder, subTriggers);
+    builder.append(")");
+
+    return builder.toString();
+  }
+
+  private void updateFinishedStatus(TriggerContext c) {
+    boolean anyFinished = false;
+    for (ExecutableTriggerStateMachine subTrigger : c.trigger().subTriggers()) 
{
+      anyFinished |= c.forTrigger(subTrigger).trigger().isFinished();
+    }
+    c.trigger().setFinished(anyFinished);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
new file mode 100644
index 0000000..723aba6
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Objects;
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateMerging;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+
+/**
+ * {@link TriggerStateMachine}s that fire based on properties of the elements 
in the current pane.
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public class AfterPaneStateMachine extends OnceTriggerStateMachine {
+
+private static final StateTag<Object, AccumulatorCombiningState<Long, long[], 
Long>>
+      ELEMENTS_IN_PANE_TAG =
+      
StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
+          "count", VarLongCoder.of(), new Sum.SumLongFn()));
+
+  private final int countElems;
+
+  private AfterPaneStateMachine(int countElems) {
+    super(null);
+    this.countElems = countElems;
+  }
+
+  /**
+   * Creates a trigger that fires when the pane contains at least {@code 
countElems} elements.
+   */
+  public static AfterPaneStateMachine elementCountAtLeast(int countElems) {
+    return new AfterPaneStateMachine(countElems);
+  }
+
+  @Override
+  public void onElement(OnElementContext c) throws Exception {
+    c.state().access(ELEMENTS_IN_PANE_TAG).add(1L);
+  }
+
+  @Override
+  public void prefetchOnMerge(MergingStateAccessor<?, ?> state) {
+    super.prefetchOnMerge(state);
+    StateMerging.prefetchCombiningValues(state, ELEMENTS_IN_PANE_TAG);
+  }
+
+  @Override
+  public void onMerge(OnMergeContext context) throws Exception {
+    // If we've already received enough elements and finished in some window,
+    // then this trigger is just finished.
+    if (context.trigger().finishedInAnyMergingWindow()) {
+      context.trigger().setFinished(true);
+      StateMerging.clear(context.state(), ELEMENTS_IN_PANE_TAG);
+      return;
+    }
+
+    // Otherwise, compute the sum of elements in all the active panes.
+    StateMerging.mergeCombiningValues(context.state(), ELEMENTS_IN_PANE_TAG);
+  }
+
+  @Override
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", 
justification =
+      "prefetch side effect")
+  public void prefetchShouldFire(StateAccessor<?> state) {
+    state.access(ELEMENTS_IN_PANE_TAG).readLater();
+  }
+
+  @Override
+  public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws 
Exception {
+    long count = context.state().access(ELEMENTS_IN_PANE_TAG).read();
+    return count >= countElems;
+  }
+
+  @Override
+  public void clear(TriggerContext c) throws Exception {
+    c.state().access(ELEMENTS_IN_PANE_TAG).clear();
+  }
+
+  @Override
+  public boolean isCompatible(TriggerStateMachine other) {
+    return this.equals(other);
+  }
+
+  @Override
+  public String toString() {
+    return "AfterPane.elementCountAtLeast(" + countElems + ")";
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof AfterPaneStateMachine)) {
+      return false;
+    }
+    AfterPaneStateMachine that = (AfterPaneStateMachine) obj;
+    return this.countElems == that.countElems;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(countElems);
+  }
+
+  @Override
+  protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) 
throws Exception {
+    clear(context);
+  }
+}

Reply via email to