http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
new file mode 100644
index 0000000..0ffbbca
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.BitSetCoder;
+import org.apache.beam.sdk.util.Timers;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.joda.time.Instant;
+
+/**
+ * Executes a trigger while managing persistence of information about which 
subtriggers are
+ * finished. Subtriggers include all recursive trigger expressions as well as 
the entire trigger.
+ *
+ * <p>Specifically, the responsibilities are:
+ *
+ * <ul>
+ *   <li>Invoking the trigger's methods via its {@link 
ExecutableTriggerStateMachine} wrapper by
+ *       constructing the appropriate trigger contexts.</li>
+ *   <li>Committing a record of which subtriggers are finished to persistent 
state.</li>
+ *   <li>Restoring the record of which subtriggers are finished from 
persistent state.</li>
+ *   <li>Clearing out the persisted finished set when a caller indicates
+ *       (via {#link #clearFinished}) that it is no longer needed.</li>
+ * </ul>
+ *
+ * <p>These responsibilities are intertwined: trigger contexts include mutable 
information about
+ * which subtriggers are finished. This class provides the information when 
building the contexts
+ * and commits the information when the method of the {@link 
ExecutableTriggerStateMachine} returns.
+ *
+ * @param <W> The kind of windows being processed.
+ */
+public class TriggerStateMachineRunner<W extends BoundedWindow> {
+  @VisibleForTesting
+  static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
+      StateTags.makeSystemTagInternal(StateTags.value("closed", 
BitSetCoder.of()));
+
+  private final ExecutableTriggerStateMachine rootTrigger;
+  private final TriggerStateMachineContextFactory<W> contextFactory;
+
+  public TriggerStateMachineRunner(
+      ExecutableTriggerStateMachine rootTrigger,
+      TriggerStateMachineContextFactory<W> contextFactory) {
+    checkState(rootTrigger.getTriggerIndex() == 0);
+    this.rootTrigger = rootTrigger;
+    this.contextFactory = contextFactory;
+  }
+
+  private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) {
+    if (!isFinishedSetNeeded()) {
+      // If no trigger in the tree will ever have finished bits, then we don't 
need to read them.
+      // So that the code can be agnostic to that fact, we create a BitSet 
that is all 0 (not
+      // finished) for each trigger in the tree.
+      return 
FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree());
+    }
+
+    BitSet bitSet = state.read();
+    return bitSet == null
+        ? 
FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree())
+            : FinishedTriggersBitSet.fromBitSet(bitSet);
+  }
+
+
+  private void clearFinishedBits(ValueState<BitSet> state) {
+    if (!isFinishedSetNeeded()) {
+      // Nothing to clear.
+      return;
+    }
+    state.clear();
+  }
+
+  /** Return true if the trigger is closed in the window corresponding to the 
specified state. */
+  public boolean isClosed(StateAccessor<?> state) {
+    return 
readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
+  }
+
+  public void prefetchForValue(W window, StateAccessor<?> state) {
+    if (isFinishedSetNeeded()) {
+      state.access(FINISHED_BITS_TAG).readLater();
+    }
+    rootTrigger.getSpec().prefetchOnElement(
+        contextFactory.createStateAccessor(window, rootTrigger));
+  }
+
+  public void prefetchOnFire(W window, StateAccessor<?> state) {
+    if (isFinishedSetNeeded()) {
+      state.access(FINISHED_BITS_TAG).readLater();
+    }
+    
rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, 
rootTrigger));
+  }
+
+  public void prefetchShouldFire(W window, StateAccessor<?> state) {
+    if (isFinishedSetNeeded()) {
+      state.access(FINISHED_BITS_TAG).readLater();
+    }
+    rootTrigger.getSpec().prefetchShouldFire(
+        contextFactory.createStateAccessor(window, rootTrigger));
+  }
+
+  /**
+   * Run the trigger logic to deal with a new value.
+   */
+  public void processValue(W window, Instant timestamp, Timers timers, 
StateAccessor<?> state)
+      throws Exception {
+    // Clone so that we can detect changes and so that changes here don't 
pollute merging.
+    FinishedTriggersBitSet finishedSet =
+        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+    TriggerStateMachine.OnElementContext triggerContext = 
contextFactory.createOnElementContext(
+        window, timers, timestamp, rootTrigger, finishedSet);
+    rootTrigger.invokeOnElement(triggerContext);
+    persistFinishedSet(state, finishedSet);
+  }
+
+  public void prefetchForMerge(
+      W window, Collection<W> mergingWindows, MergingStateAccessor<?, W> 
state) {
+    if (isFinishedSetNeeded()) {
+      for (ValueState<?> value : 
state.accessInEachMergingWindow(FINISHED_BITS_TAG).values()) {
+        value.readLater();
+      }
+    }
+    
rootTrigger.getSpec().prefetchOnMerge(contextFactory.createMergingStateAccessor(
+        window, mergingWindows, rootTrigger));
+  }
+
+  /**
+   * Run the trigger merging logic as part of executing the specified merge.
+   */
+  public void onMerge(W window, Timers timers, MergingStateAccessor<?, W> 
state) throws Exception {
+    // Clone so that we can detect changes and so that changes here don't 
pollute merging.
+    FinishedTriggersBitSet finishedSet =
+        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+
+    // And read the finished bits in each merging window.
+    ImmutableMap.Builder<W, FinishedTriggers> builder = ImmutableMap.builder();
+    for (Map.Entry<W, ValueState<BitSet>> entry :
+        state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) {
+      // Don't need to clone these, since the trigger context doesn't allow 
modification
+      builder.put(entry.getKey(), readFinishedBits(entry.getValue()));
+      // Clear the underlying finished bits.
+      clearFinishedBits(entry.getValue());
+    }
+    ImmutableMap<W, FinishedTriggers> mergingFinishedSets = builder.build();
+
+    TriggerStateMachine.OnMergeContext mergeContext = 
contextFactory.createOnMergeContext(
+        window, timers, rootTrigger, finishedSet, mergingFinishedSets);
+
+    // Run the merge from the trigger
+    rootTrigger.invokeOnMerge(mergeContext);
+
+    persistFinishedSet(state, finishedSet);
+  }
+
+  public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) 
throws Exception {
+    FinishedTriggers finishedSet = 
readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+    TriggerStateMachine.TriggerContext context = contextFactory.base(window, 
timers,
+        rootTrigger, finishedSet);
+    return rootTrigger.invokeShouldFire(context);
+  }
+
+  public void onFire(W window, Timers timers, StateAccessor<?> state) throws 
Exception {
+    // shouldFire should be false.
+    // However it is too expensive to assert.
+    FinishedTriggersBitSet finishedSet =
+        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+    TriggerStateMachine.TriggerContext context = contextFactory.base(window, 
timers,
+        rootTrigger, finishedSet);
+    rootTrigger.invokeOnFire(context);
+    persistFinishedSet(state, finishedSet);
+  }
+
+  private void persistFinishedSet(
+      StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) {
+    if (!isFinishedSetNeeded()) {
+      return;
+    }
+
+    ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG);
+    if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
+      if (modifiedFinishedSet.getBitSet().isEmpty()) {
+        finishedSetState.clear();
+      } else {
+        finishedSetState.write(modifiedFinishedSet.getBitSet());
+      }
+    }
+  }
+
+  /**
+   * Clear the finished bits.
+   */
+  public void clearFinished(StateAccessor<?> state) {
+    clearFinishedBits(state.access(FINISHED_BITS_TAG));
+  }
+
+  /**
+   * Clear the state used for executing triggers, but leave the finished set 
to indicate
+   * the window is closed.
+   */
+  public void clearState(W window, Timers timers, StateAccessor<?> state) 
throws Exception {
+    // Don't need to clone, because we'll be clearing the finished bits 
anyways.
+    FinishedTriggers finishedSet = 
readFinishedBits(state.access(FINISHED_BITS_TAG));
+    rootTrigger.invokeClear(contextFactory.base(window, timers, rootTrigger, 
finishedSet));
+  }
+
+  private boolean isFinishedSetNeeded() {
+    // TODO: If we know that no trigger in the tree will ever finish, we don't 
need to do the
+    // lookup. Right now, we special case this for the DefaultTrigger.
+    return !(rootTrigger.getSpec() instanceof DefaultTriggerStateMachine);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/package-info.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/package-info.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/package-info.java
new file mode 100644
index 0000000..b7c7050
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * State machine implementations for triggers, called "triggers" because
+ * they react to events.
+ */
+package org.apache.beam.runners.core.triggers;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterAllTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterAllTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterAllTest.java
deleted file mode 100644
index b591229..0000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterAllTest.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link AfterAll}.
- */
-@RunWith(JUnit4.class)
-public class AfterAllTest {
-
-  private SimpleTriggerTester<IntervalWindow> tester;
-
-  @Test
-  public void testT1FiresFirst() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterAll.of(
-            AfterPane.elementCountAtLeast(1),
-            AfterPane.elementCountAtLeast(2)),
-        FixedWindows.of(Duration.millis(100)));
-
-    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
-
-    tester.injectElements(1);
-    assertFalse(tester.shouldFire(window));
-
-    tester.injectElements(2);
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertTrue(tester.isMarkedFinished(window));
-  }
-
-  @Test
-  public void testT2FiresFirst() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterAll.of(
-            AfterPane.elementCountAtLeast(2),
-            AfterPane.elementCountAtLeast(1)),
-        FixedWindows.of(Duration.millis(100)));
-
-    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
-
-    tester.injectElements(1);
-    assertFalse(tester.shouldFire(window));
-
-    tester.injectElements(2);
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertTrue(tester.isMarkedFinished(window));
-  }
-
-  /**
-   * Tests that the AfterAll properly unsets finished bits when a merge 
causing it to become
-   * unfinished.
-   */
-  @Test
-  public void testOnMergeRewinds() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterEach.inOrder(
-            AfterAll.of(
-                AfterWatermark.pastEndOfWindow(),
-                AfterPane.elementCountAtLeast(1)),
-            Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
-        Sessions.withGapDuration(Duration.millis(10)));
-
-    tester.injectElements(1);
-    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
-
-    tester.injectElements(5);
-    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
-
-    // Finish the AfterAll in the first window
-    tester.advanceInputWatermark(new Instant(11));
-    assertTrue(tester.shouldFire(firstWindow));
-    assertFalse(tester.shouldFire(secondWindow));
-    tester.fireIfShouldFire(firstWindow);
-
-    // Merge them; the AfterAll should not be finished
-    tester.mergeWindows();
-    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
-    assertFalse(tester.isMarkedFinished(mergedWindow));
-
-    // Confirm that we are back on the first trigger by probing that it is not 
ready to fire
-    // after an element (with merging)
-    tester.injectElements(3);
-    tester.mergeWindows();
-    assertFalse(tester.shouldFire(mergedWindow));
-
-    // Fire the AfterAll in the merged window
-    tester.advanceInputWatermark(new Instant(15));
-    assertTrue(tester.shouldFire(mergedWindow));
-    tester.fireIfShouldFire(mergedWindow);
-
-    // Confirm that we are on the second trigger by probing
-    tester.injectElements(2);
-    tester.mergeWindows();
-    assertTrue(tester.shouldFire(mergedWindow));
-    tester.fireIfShouldFire(mergedWindow);
-    tester.injectElements(2);
-    tester.mergeWindows();
-    assertTrue(tester.shouldFire(mergedWindow));
-    tester.fireIfShouldFire(mergedWindow);
-  }
-
-  @Test
-  public void testFireDeadline() throws Exception {
-    BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-
-    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
-        AfterAll.of(AfterWatermark.pastEndOfWindow(), 
AfterPane.elementCountAtLeast(1))
-            .getWatermarkThatGuaranteesFiring(window));
-  }
-
-  @Test
-  public void testContinuation() throws Exception {
-    OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane();
-    OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow();
-    Trigger afterAll = AfterAll.of(trigger1, trigger2);
-    assertEquals(
-        AfterAll.of(trigger1.getContinuationTrigger(), 
trigger2.getContinuationTrigger()),
-        afterAll.getContinuationTrigger());
-  }
-
-  @Test
-  public void testToString() {
-    Trigger trigger = AfterAll.of(StubTrigger.named("t1"), 
StubTrigger.named("t2"));
-    assertEquals("AfterAll.of(t1, t2)", trigger.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterEachTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterEachTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterEachTest.java
deleted file mode 100644
index c413c6e..0000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterEachTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link AfterEach}.
- */
-@RunWith(JUnit4.class)
-public class AfterEachTest {
-
-  private SimpleTriggerTester<IntervalWindow> tester;
-
-  @Before
-  public void initMocks() {
-    MockitoAnnotations.initMocks(this);
-  }
-
-  /**
-   * Tests that the {@link AfterEach} trigger fires and finishes the first 
trigger then the second.
-   */
-  @Test
-  public void testAfterEachInSequence() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterEach.inOrder(
-            Repeatedly.forever(AfterPane.elementCountAtLeast(2))
-                .orFinally(AfterPane.elementCountAtLeast(3)),
-            Repeatedly.forever(AfterPane.elementCountAtLeast(5))
-                .orFinally(AfterWatermark.pastEndOfWindow())),
-            FixedWindows.of(Duration.millis(10)));
-
-    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(10));
-
-    // AfterCount(2) not ready
-    tester.injectElements(1);
-    assertFalse(tester.shouldFire(window));
-
-    // AfterCount(2) ready, not finished
-    tester.injectElements(2);
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertFalse(tester.isMarkedFinished(window));
-
-    // orFinally(AfterCount(3)) ready and will finish the first
-    tester.injectElements(1, 2, 3);
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertFalse(tester.isMarkedFinished(window));
-
-    // Now running as the second trigger
-    assertFalse(tester.shouldFire(window));
-    // This quantity of elements would fire and finish if it were erroneously 
still the first
-    tester.injectElements(1, 2, 3, 4);
-    assertFalse(tester.shouldFire(window));
-
-    // Now fire once
-    tester.injectElements(5);
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertFalse(tester.isMarkedFinished(window));
-
-    // This time advance the watermark to finish the whole mess.
-    tester.advanceInputWatermark(new Instant(10));
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertTrue(tester.isMarkedFinished(window));
-  }
-
-  @Test
-  public void testFireDeadline() throws Exception {
-    BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-
-    assertEquals(new Instant(9),
-        AfterEach.inOrder(AfterWatermark.pastEndOfWindow(),
-                          AfterPane.elementCountAtLeast(4))
-            .getWatermarkThatGuaranteesFiring(window));
-
-    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
-        AfterEach.inOrder(AfterPane.elementCountAtLeast(2), 
AfterWatermark.pastEndOfWindow())
-            .getWatermarkThatGuaranteesFiring(window));
-  }
-
-  @Test
-  public void testContinuation() throws Exception {
-    OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane();
-    OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow();
-    Trigger afterEach = AfterEach.inOrder(trigger1, trigger2);
-    assertEquals(
-        Repeatedly.forever(AfterFirst.of(
-            trigger1.getContinuationTrigger(), 
trigger2.getContinuationTrigger())),
-        afterEach.getContinuationTrigger());
-  }
-
-  @Test
-  public void testToString() {
-    Trigger trigger = AfterEach.inOrder(
-        StubTrigger.named("t1"),
-        StubTrigger.named("t2"),
-        StubTrigger.named("t3"));
-
-    assertEquals("AfterEach.inOrder(t1, t2, t3)", trigger.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterFirstTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterFirstTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterFirstTest.java
deleted file mode 100644
index 415060b..0000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterFirstTest.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link AfterFirst}.
- */
-@RunWith(JUnit4.class)
-public class AfterFirstTest {
-
-  @Mock private OnceTrigger mockTrigger1;
-  @Mock private OnceTrigger mockTrigger2;
-  private SimpleTriggerTester<IntervalWindow> tester;
-  private static Trigger.TriggerContext anyTriggerContext() {
-    return Mockito.<Trigger.TriggerContext>any();
-  }
-
-  @Before
-  public void initMocks() {
-    MockitoAnnotations.initMocks(this);
-  }
-
-  @Test
-  public void testNeitherShouldFireFixedWindows() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterFirst.of(mockTrigger1, mockTrigger2), 
FixedWindows.of(Duration.millis(10)));
-
-    tester.injectElements(1);
-    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(10));
-
-    when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false);
-    when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false);
-
-    assertFalse(tester.shouldFire(window)); // should not fire
-    assertFalse(tester.isMarkedFinished(window)); // not finished
-  }
-
-  @Test
-  public void testOnlyT1ShouldFireFixedWindows() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterFirst.of(mockTrigger1, mockTrigger2), 
FixedWindows.of(Duration.millis(10)));
-    tester.injectElements(1);
-    IntervalWindow window = new IntervalWindow(new Instant(1), new 
Instant(11));
-
-    when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true);
-    when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false);
-
-    assertTrue(tester.shouldFire(window)); // should fire
-
-    tester.fireIfShouldFire(window);
-    assertTrue(tester.isMarkedFinished(window));
-  }
-
-  @Test
-  public void testOnlyT2ShouldFireFixedWindows() throws Exception {
-    tester = TriggerTester.forTrigger(
-    AfterFirst.of(mockTrigger1, mockTrigger2), 
FixedWindows.of(Duration.millis(10)));
-    tester.injectElements(1);
-    IntervalWindow window = new IntervalWindow(new Instant(1), new 
Instant(11));
-
-    when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false);
-    when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true);
-    assertTrue(tester.shouldFire(window)); // should fire
-
-    tester.fireIfShouldFire(window); // now finished
-    assertTrue(tester.isMarkedFinished(window));
-  }
-
-  @Test
-  public void testBothShouldFireFixedWindows() throws Exception {
-    tester = TriggerTester.forTrigger(
-    AfterFirst.of(mockTrigger1, mockTrigger2), 
FixedWindows.of(Duration.millis(10)));
-    tester.injectElements(1);
-    IntervalWindow window = new IntervalWindow(new Instant(1), new 
Instant(11));
-
-    when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true);
-    when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true);
-    assertTrue(tester.shouldFire(window)); // should fire
-
-    tester.fireIfShouldFire(window);
-    assertTrue(tester.isMarkedFinished(window));
-  }
-
-  /**
-   * Tests that if the first trigger rewinds to be non-finished in the merged 
window,
-   * then it becomes the currently active trigger again, with real triggers.
-   */
-  @Test
-  public void testShouldFireAfterMerge() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterEach.inOrder(
-            AfterFirst.of(AfterPane.elementCountAtLeast(5),
-                AfterWatermark.pastEndOfWindow()),
-            Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
-        Sessions.withGapDuration(Duration.millis(10)));
-
-    // Finished the AfterFirst in the first window
-    tester.injectElements(1);
-    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
-    assertFalse(tester.shouldFire(firstWindow));
-    tester.advanceInputWatermark(new Instant(11));
-    assertTrue(tester.shouldFire(firstWindow));
-    tester.fireIfShouldFire(firstWindow);
-
-    // Set up second window where it is not done
-    tester.injectElements(5);
-    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
-    assertFalse(tester.shouldFire(secondWindow));
-
-    // Merge them, if the merged window were on the second trigger, it would 
be ready
-    tester.mergeWindows();
-    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
-    assertFalse(tester.shouldFire(mergedWindow));
-
-    // Now adding 3 more makes the AfterFirst ready to fire
-    tester.injectElements(1, 2, 3, 4, 5);
-    tester.mergeWindows();
-    assertTrue(tester.shouldFire(mergedWindow));
-  }
-
-  @Test
-  public void testFireDeadline() throws Exception {
-    BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-
-    assertEquals(new Instant(9),
-        AfterFirst.of(AfterWatermark.pastEndOfWindow(), 
AfterPane.elementCountAtLeast(4))
-            .getWatermarkThatGuaranteesFiring(window));
-    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
-        AfterFirst.of(AfterPane.elementCountAtLeast(2), 
AfterPane.elementCountAtLeast(1))
-            .getWatermarkThatGuaranteesFiring(window));
-  }
-
-  @Test
-  public void testContinuation() throws Exception {
-    OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane();
-    OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow();
-    Trigger afterFirst = AfterFirst.of(trigger1, trigger2);
-    assertEquals(
-        AfterFirst.of(trigger1.getContinuationTrigger(), 
trigger2.getContinuationTrigger()),
-        afterFirst.getContinuationTrigger());
-  }
-
-  @Test
-  public void testToString() {
-    Trigger trigger = AfterFirst.of(StubTrigger.named("t1"), 
StubTrigger.named("t2"));
-    assertEquals("AfterFirst.of(t1, t2)", trigger.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java
deleted file mode 100644
index 38d030e..0000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link AfterPane}.
- */
-@RunWith(JUnit4.class)
-public class AfterPaneTest {
-
-  SimpleTriggerTester<IntervalWindow> tester;
-  /**
-   * Tests that the trigger does fire when enough elements are in a window, 
and that it only
-   * fires that window (no leakage).
-   */
-  @Test
-  public void testAfterPaneElementCountFixedWindows() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterPane.elementCountAtLeast(2),
-        FixedWindows.of(Duration.millis(10)));
-
-    tester.injectElements(1); // [0, 10)
-    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(10));
-    assertFalse(tester.shouldFire(window));
-
-    tester.injectElements(2); // [0, 10)
-    tester.injectElements(11); // [10, 20)
-
-    assertTrue(tester.shouldFire(window)); // ready to fire
-    tester.fireIfShouldFire(window); // and finished
-    assertTrue(tester.isMarkedFinished(window));
-
-    // But don't finish the other window
-    assertFalse(tester.isMarkedFinished(new IntervalWindow(new Instant(10), 
new Instant(20))));
-  }
-
-  @Test
-  public void testClear() throws Exception {
-    SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
-        AfterPane.elementCountAtLeast(2),
-        FixedWindows.of(Duration.millis(10)));
-
-    tester.injectElements(1, 2, 3);
-    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(10));
-    tester.clearState(window);
-    tester.assertCleared(window);
-  }
-
-  @Test
-  public void testAfterPaneElementCountSessions() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterPane.elementCountAtLeast(2),
-        Sessions.withGapDuration(Duration.millis(10)));
-
-    tester.injectElements(
-        1, // in [1, 11)
-        2); // in [2, 12)
-
-    assertFalse(tester.shouldFire(new IntervalWindow(new Instant(1), new 
Instant(11))));
-    assertFalse(tester.shouldFire(new IntervalWindow(new Instant(2), new 
Instant(12))));
-
-    tester.mergeWindows();
-
-    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(12));
-    assertTrue(tester.shouldFire(mergedWindow));
-    tester.fireIfShouldFire(mergedWindow);
-    assertTrue(tester.isMarkedFinished(mergedWindow));
-
-    // Because we closed the previous window, we don't have it around to merge 
with. So there
-    // will be a new FIRE_AND_FINISH result.
-    tester.injectElements(
-        7,  // in [7, 17)
-        9); // in [9, 19)
-
-    tester.mergeWindows();
-
-    IntervalWindow newMergedWindow = new IntervalWindow(new Instant(7), new 
Instant(19));
-    assertTrue(tester.shouldFire(newMergedWindow));
-    tester.fireIfShouldFire(newMergedWindow);
-    assertTrue(tester.isMarkedFinished(newMergedWindow));
-  }
-
-  @Test
-  public void testFireDeadline() throws Exception {
-    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
-        AfterPane.elementCountAtLeast(1).getWatermarkThatGuaranteesFiring(
-            new IntervalWindow(new Instant(0), new Instant(10))));
-  }
-
-  @Test
-  public void testContinuation() throws Exception {
-    assertEquals(
-        AfterPane.elementCountAtLeast(1),
-        AfterPane.elementCountAtLeast(100).getContinuationTrigger());
-    assertEquals(
-        AfterPane.elementCountAtLeast(1),
-        
AfterPane.elementCountAtLeast(100).getContinuationTrigger().getContinuationTrigger());
-  }
-
-  @Test
-  public void testToString() {
-    Trigger trigger = AfterPane.elementCountAtLeast(5);
-    assertEquals("AfterPane.elementCountAtLeast(5)", trigger.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java
deleted file mode 100644
index 13a7acf..0000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests the {@link AfterProcessingTime}.
- */
-@RunWith(JUnit4.class)
-public class AfterProcessingTimeTest {
-
-  /**
-   * Tests the basic property that the trigger does wait for processing time 
to be
-   * far enough advanced.
-   */
-  @Test
-  public void testAfterProcessingTimeFixedWindows() throws Exception {
-    Duration windowDuration = Duration.millis(10);
-    SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
-        AfterProcessingTime
-            .pastFirstElementInPane()
-            .plusDelayOf(Duration.millis(5)),
-        FixedWindows.of(windowDuration));
-
-    tester.advanceProcessingTime(new Instant(10));
-
-    // Timer at 15
-    tester.injectElements(1);
-    IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new 
Instant(10));
-    tester.advanceProcessingTime(new Instant(12));
-    assertFalse(tester.shouldFire(firstWindow));
-
-    // Load up elements in the next window, timer at 17 for them
-    tester.injectElements(11, 12, 13);
-    IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new 
Instant(20));
-    assertFalse(tester.shouldFire(secondWindow));
-
-    // Not quite time to fire
-    tester.advanceProcessingTime(new Instant(14));
-    assertFalse(tester.shouldFire(firstWindow));
-    assertFalse(tester.shouldFire(secondWindow));
-
-    // Timer at 19 for these in the first window; it should be ignored since 
the 15 will fire first
-    tester.injectElements(2, 3);
-
-    // Advance past the first timer and fire, finishing the first window
-    tester.advanceProcessingTime(new Instant(16));
-    assertTrue(tester.shouldFire(firstWindow));
-    assertFalse(tester.shouldFire(secondWindow));
-    tester.fireIfShouldFire(firstWindow);
-    assertTrue(tester.isMarkedFinished(firstWindow));
-
-    // The next window fires and finishes now
-    tester.advanceProcessingTime(new Instant(18));
-    assertTrue(tester.shouldFire(secondWindow));
-    tester.fireIfShouldFire(secondWindow);
-    assertTrue(tester.isMarkedFinished(secondWindow));
-  }
-
-  /**
-   * Tests that when windows merge, if the trigger is waiting for "N millis 
after the first
-   * element" that it is relative to the earlier of the two merged windows.
-   */
-  @Test
-  public void testClear() throws Exception {
-    SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
-        AfterProcessingTime
-            .pastFirstElementInPane()
-            .plusDelayOf(Duration.millis(5)),
-        FixedWindows.of(Duration.millis(10)));
-
-    tester.injectElements(1, 2, 3);
-    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(10));
-    tester.clearState(window);
-    tester.assertCleared(window);
-  }
-
-  @Test
-  public void testAfterProcessingTimeWithMergingWindow() throws Exception {
-    SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
-        AfterProcessingTime
-            .pastFirstElementInPane()
-            .plusDelayOf(Duration.millis(5)),
-        Sessions.withGapDuration(Duration.millis(10)));
-
-    tester.advanceProcessingTime(new Instant(10));
-    tester.injectElements(1); // in [1, 11), timer for 15
-    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
-    assertFalse(tester.shouldFire(firstWindow));
-
-    tester.advanceProcessingTime(new Instant(12));
-    tester.injectElements(3); // in [3, 13), timer for 17
-    IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new 
Instant(13));
-    assertFalse(tester.shouldFire(secondWindow));
-
-    tester.mergeWindows();
-    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(13));
-
-    tester.advanceProcessingTime(new Instant(16));
-    assertTrue(tester.shouldFire(mergedWindow));
-  }
-
-  @Test
-  public void testFireDeadline() throws Exception {
-    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
-        
AfterProcessingTime.pastFirstElementInPane().getWatermarkThatGuaranteesFiring(
-            new IntervalWindow(new Instant(0), new Instant(10))));
-  }
-
-  @Test
-  public void testContinuation() throws Exception {
-    OnceTrigger firstElementPlus1 =
-        
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1));
-    assertEquals(
-        new AfterSynchronizedProcessingTime(),
-        firstElementPlus1.getContinuationTrigger());
-  }
-
-  /**
-   * Basic test of compatibility check between identical triggers.
-   */
-  @Test
-  public void testCompatibilityIdentical() throws Exception {
-    Trigger t1 = AfterProcessingTime.pastFirstElementInPane()
-            .plusDelayOf(Duration.standardMinutes(1L));
-    Trigger t2 = AfterProcessingTime.pastFirstElementInPane()
-            .plusDelayOf(Duration.standardMinutes(1L));
-    assertTrue(t1.isCompatible(t2));
-  }
-
-  @Test
-  public void testToString() {
-    Trigger trigger = AfterProcessingTime.pastFirstElementInPane();
-    assertEquals("AfterProcessingTime.pastFirstElementInPane()", 
trigger.toString());
-  }
-
-  @Test
-  public void testWithDelayToString() {
-    Trigger trigger = AfterProcessingTime.pastFirstElementInPane()
-        .plusDelayOf(Duration.standardMinutes(5));
-
-    assertEquals("AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5 
minutes)",
-        trigger.toString());
-  }
-
-  @Test
-  public void testBuiltUpToString() {
-    Trigger trigger = AfterWatermark.pastEndOfWindow()
-        .withLateFirings(AfterProcessingTime
-            .pastFirstElementInPane()
-            .plusDelayOf(Duration.standardMinutes(10)));
-
-    String expected = "AfterWatermark.pastEndOfWindow()"
-        + ".withLateFirings(AfterProcessingTime"
-        + ".pastFirstElementInPane()"
-        + ".plusDelayOf(10 minutes))";
-
-    assertEquals(expected, trigger.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java
deleted file mode 100644
index 7e6e938..0000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests the {@link AfterSynchronizedProcessingTime}.
- */
-@RunWith(JUnit4.class)
-public class AfterSynchronizedProcessingTimeTest {
-
-  private Trigger underTest = new AfterSynchronizedProcessingTime();
-
-  @Test
-  public void testAfterProcessingTimeWithFixedWindows() throws Exception {
-    Duration windowDuration = Duration.millis(10);
-    SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
-        AfterProcessingTime
-            .pastFirstElementInPane()
-            .plusDelayOf(Duration.millis(5)),
-        FixedWindows.of(windowDuration));
-
-    tester.advanceProcessingTime(new Instant(10));
-
-    // Timer at 15
-    tester.injectElements(1);
-    IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new 
Instant(10));
-    tester.advanceProcessingTime(new Instant(12));
-    assertFalse(tester.shouldFire(firstWindow));
-
-    // Load up elements in the next window, timer at 17 for them
-    tester.injectElements(11, 12, 13);
-    IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new 
Instant(20));
-    assertFalse(tester.shouldFire(secondWindow));
-
-    // Not quite time to fire
-    tester.advanceProcessingTime(new Instant(14));
-    assertFalse(tester.shouldFire(firstWindow));
-    assertFalse(tester.shouldFire(secondWindow));
-
-    // Timer at 19 for these in the first window; it should be ignored since 
the 15 will fire first
-    tester.injectElements(2, 3);
-
-    // Advance past the first timer and fire, finishing the first window
-    tester.advanceProcessingTime(new Instant(16));
-    assertTrue(tester.shouldFire(firstWindow));
-    assertFalse(tester.shouldFire(secondWindow));
-    tester.fireIfShouldFire(firstWindow);
-    assertTrue(tester.isMarkedFinished(firstWindow));
-
-    // The next window fires and finishes now
-    tester.advanceProcessingTime(new Instant(18));
-    assertTrue(tester.shouldFire(secondWindow));
-    tester.fireIfShouldFire(secondWindow);
-    assertTrue(tester.isMarkedFinished(secondWindow));
-  }
-
-  @Test
-  public void testAfterProcessingTimeWithMergingWindow() throws Exception {
-    Duration windowDuration = Duration.millis(10);
-    SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
-        AfterProcessingTime
-            .pastFirstElementInPane()
-            .plusDelayOf(Duration.millis(5)),
-        Sessions.withGapDuration(windowDuration));
-
-    tester.advanceProcessingTime(new Instant(10));
-    tester.injectElements(1); // in [1, 11), timer for 15
-    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
-    assertFalse(tester.shouldFire(firstWindow));
-
-    tester.advanceProcessingTime(new Instant(12));
-    tester.injectElements(3); // in [3, 13), timer for 17
-    IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new 
Instant(13));
-    assertFalse(tester.shouldFire(secondWindow));
-
-    tester.mergeWindows();
-    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(13));
-
-    tester.advanceProcessingTime(new Instant(16));
-    assertTrue(tester.shouldFire(mergedWindow));
-  }
-
-  @Test
-  public void testFireDeadline() throws Exception {
-    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
-        underTest.getWatermarkThatGuaranteesFiring(
-            new IntervalWindow(new Instant(0), new Instant(10))));
-  }
-
-  @Test
-  public void testContinuation() throws Exception {
-    assertEquals(underTest, underTest.getContinuationTrigger());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java
deleted file mode 100644
index 084027b..0000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests the {@link AfterWatermark} triggers.
- */
-@RunWith(JUnit4.class)
-public class AfterWatermarkTest {
-
-  @Mock private OnceTrigger mockEarly;
-  @Mock private OnceTrigger mockLate;
-
-  private SimpleTriggerTester<IntervalWindow> tester;
-  private static Trigger.TriggerContext anyTriggerContext() {
-    return Mockito.<Trigger.TriggerContext>any();
-  }
-  private static Trigger.OnElementContext anyElementContext() {
-    return Mockito.<Trigger.OnElementContext>any();
-  }
-
-  private void injectElements(int... elements) throws Exception {
-    for (int element : elements) {
-      doNothing().when(mockEarly).onElement(anyElementContext());
-      doNothing().when(mockLate).onElement(anyElementContext());
-      tester.injectElements(element);
-    }
-  }
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-  }
-
-  public void testRunningAsTrigger(OnceTrigger mockTrigger, IntervalWindow 
window)
-      throws Exception {
-
-    // Don't fire due to mock saying no
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
-    assertFalse(tester.shouldFire(window)); // not ready
-
-    // Fire due to mock trigger; early trigger is required to be a OnceTrigger
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
-    assertTrue(tester.shouldFire(window)); // ready
-    tester.fireIfShouldFire(window);
-    assertFalse(tester.isMarkedFinished(window));
-  }
-
-  @Test
-  public void testEarlyAndAtWatermark() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterWatermark.pastEndOfWindow()
-            .withEarlyFirings(mockEarly),
-        FixedWindows.of(Duration.millis(100)));
-
-    injectElements(1);
-    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
-
-    testRunningAsTrigger(mockEarly, window);
-
-    // Fire due to watermark
-    when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
-    tester.advanceInputWatermark(new Instant(100));
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertTrue(tester.isMarkedFinished(window));
-  }
-
-  @Test
-  public void testAtWatermarkAndLate() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterWatermark.pastEndOfWindow()
-            .withLateFirings(mockLate),
-        FixedWindows.of(Duration.millis(100)));
-
-    injectElements(1);
-    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
-
-    // No early firing, just double checking
-    when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(true);
-    assertFalse(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertFalse(tester.isMarkedFinished(window));
-
-    // Fire due to watermark
-    when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
-    tester.advanceInputWatermark(new Instant(100));
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertFalse(tester.isMarkedFinished(window));
-
-    testRunningAsTrigger(mockLate, window);
-  }
-
-  @Test
-  public void testEarlyAndAtWatermarkAndLate() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterWatermark.pastEndOfWindow()
-            .withEarlyFirings(mockEarly)
-            .withLateFirings(mockLate),
-        FixedWindows.of(Duration.millis(100)));
-
-    injectElements(1);
-    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
-
-    testRunningAsTrigger(mockEarly, window);
-
-    // Fire due to watermark
-    when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
-    tester.advanceInputWatermark(new Instant(100));
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertFalse(tester.isMarkedFinished(window));
-
-    testRunningAsTrigger(mockLate, window);
-  }
-
-  /**
-   * Tests that if the EOW is finished in both as well as the merged window, 
then
-   * it is finished in the merged result.
-   *
-   * <p>Because windows are discarded when a trigger finishes, we need to 
embed this
-   * in a sequence in order to check that it is re-activated. So this test is 
potentially
-   * sensitive to other triggers' correctness.
-   */
-  @Test
-  public void testOnMergeAlreadyFinished() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterEach.inOrder(
-            AfterWatermark.pastEndOfWindow(),
-            Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
-        Sessions.withGapDuration(Duration.millis(10)));
-
-    tester.injectElements(1);
-    tester.injectElements(5);
-    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
-    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
-    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
-
-    // Finish the AfterWatermark.pastEndOfWindow() trigger in both windows
-    tester.advanceInputWatermark(new Instant(15));
-    assertTrue(tester.shouldFire(firstWindow));
-    assertTrue(tester.shouldFire(secondWindow));
-    tester.fireIfShouldFire(firstWindow);
-    tester.fireIfShouldFire(secondWindow);
-
-    // Confirm that we are on the second trigger by probing
-    assertFalse(tester.shouldFire(firstWindow));
-    assertFalse(tester.shouldFire(secondWindow));
-    tester.injectElements(1);
-    tester.injectElements(5);
-    assertTrue(tester.shouldFire(firstWindow));
-    assertTrue(tester.shouldFire(secondWindow));
-    tester.fireIfShouldFire(firstWindow);
-    tester.fireIfShouldFire(secondWindow);
-
-    // Merging should leave it finished
-    tester.mergeWindows();
-
-    // Confirm that we are on the second trigger by probing
-    assertFalse(tester.shouldFire(mergedWindow));
-    tester.injectElements(1);
-    assertTrue(tester.shouldFire(mergedWindow));
-  }
-
-  /**
-   * Tests that the trigger rewinds to be non-finished in the merged window.
-   *
-   * <p>Because windows are discarded when a trigger finishes, we need to 
embed this
-   * in a sequence in order to check that it is re-activated. So this test is 
potentially
-   * sensitive to other triggers' correctness.
-   */
-  @Test
-  public void testOnMergeRewinds() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterEach.inOrder(
-            AfterWatermark.pastEndOfWindow(),
-            Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
-        Sessions.withGapDuration(Duration.millis(10)));
-
-    tester.injectElements(1);
-    tester.injectElements(5);
-    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
-    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
-    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
-
-    // Finish the AfterWatermark.pastEndOfWindow() trigger in only the first 
window
-    tester.advanceInputWatermark(new Instant(11));
-    assertTrue(tester.shouldFire(firstWindow));
-    assertFalse(tester.shouldFire(secondWindow));
-    tester.fireIfShouldFire(firstWindow);
-
-    // Confirm that we are on the second trigger by probing
-    assertFalse(tester.shouldFire(firstWindow));
-    tester.injectElements(1);
-    assertTrue(tester.shouldFire(firstWindow));
-    tester.fireIfShouldFire(firstWindow);
-
-    // Merging should re-activate the watermark trigger in the merged window
-    tester.mergeWindows();
-
-    // Confirm that we are not on the second trigger by probing
-    assertFalse(tester.shouldFire(mergedWindow));
-    tester.injectElements(1);
-    assertFalse(tester.shouldFire(mergedWindow));
-
-    // And confirm that advancing the watermark fires again
-    tester.advanceInputWatermark(new Instant(15));
-    assertTrue(tester.shouldFire(mergedWindow));
-  }
-
-  /**
-   * Tests that if the EOW is finished in both as well as the merged window, 
then
-   * it is finished in the merged result.
-   *
-   * <p>Because windows are discarded when a trigger finishes, we need to 
embed this
-   * in a sequence in order to check that it is re-activated. So this test is 
potentially
-   * sensitive to other triggers' correctness.
-   */
-  @Test
-  public void testEarlyAndLateOnMergeAlreadyFinished() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterWatermark.pastEndOfWindow()
-            .withEarlyFirings(AfterPane.elementCountAtLeast(100))
-            .withLateFirings(AfterPane.elementCountAtLeast(1)),
-        Sessions.withGapDuration(Duration.millis(10)));
-
-    tester.injectElements(1);
-    tester.injectElements(5);
-    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
-    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
-    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
-
-    // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in both 
windows
-    tester.advanceInputWatermark(new Instant(15));
-    assertTrue(tester.shouldFire(firstWindow));
-    assertTrue(tester.shouldFire(secondWindow));
-    tester.fireIfShouldFire(firstWindow);
-    tester.fireIfShouldFire(secondWindow);
-
-    // Confirm that we are on the late trigger by probing
-    assertFalse(tester.shouldFire(firstWindow));
-    assertFalse(tester.shouldFire(secondWindow));
-    tester.injectElements(1);
-    tester.injectElements(5);
-    assertTrue(tester.shouldFire(firstWindow));
-    assertTrue(tester.shouldFire(secondWindow));
-    tester.fireIfShouldFire(firstWindow);
-    tester.fireIfShouldFire(secondWindow);
-
-    // Merging should leave it on the late trigger
-    tester.mergeWindows();
-
-    // Confirm that we are on the late trigger by probing
-    assertFalse(tester.shouldFire(mergedWindow));
-    tester.injectElements(1);
-    assertTrue(tester.shouldFire(mergedWindow));
-  }
-
-  /**
-   * Tests that the trigger rewinds to be non-finished in the merged window.
-   *
-   * <p>Because windows are discarded when a trigger finishes, we need to 
embed this
-   * in a sequence in order to check that it is re-activated. So this test is 
potentially
-   * sensitive to other triggers' correctness.
-   */
-  @Test
-  public void testEarlyAndLateOnMergeRewinds() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterWatermark.pastEndOfWindow()
-            .withEarlyFirings(AfterPane.elementCountAtLeast(100))
-            .withLateFirings(AfterPane.elementCountAtLeast(1)),
-        Sessions.withGapDuration(Duration.millis(10)));
-
-    tester.injectElements(1);
-    tester.injectElements(5);
-    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
-    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
-    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
-
-    // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in only 
the first window
-    tester.advanceInputWatermark(new Instant(11));
-    assertTrue(tester.shouldFire(firstWindow));
-    assertFalse(tester.shouldFire(secondWindow));
-    tester.fireIfShouldFire(firstWindow);
-
-    // Confirm that we are on the late trigger by probing
-    assertFalse(tester.shouldFire(firstWindow));
-    tester.injectElements(1);
-    assertTrue(tester.shouldFire(firstWindow));
-    tester.fireIfShouldFire(firstWindow);
-
-    // Merging should re-activate the early trigger in the merged window
-    tester.mergeWindows();
-
-    // Confirm that we are not on the second trigger by probing
-    assertFalse(tester.shouldFire(mergedWindow));
-    tester.injectElements(1);
-    assertFalse(tester.shouldFire(mergedWindow));
-
-    // And confirm that advancing the watermark fires again
-    tester.advanceInputWatermark(new Instant(15));
-    assertTrue(tester.shouldFire(mergedWindow));
-  }
-
-  @Test
-  public void testFromEndOfWindowToString() {
-    Trigger trigger = AfterWatermark.pastEndOfWindow();
-    assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString());
-  }
-
-  @Test
-  public void testEarlyFiringsToString() {
-    Trigger trigger = 
AfterWatermark.pastEndOfWindow().withEarlyFirings(StubTrigger.named("t1"));
-
-    assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1)", 
trigger.toString());
-  }
-
-  @Test
-  public void testLateFiringsToString() {
-    Trigger trigger = 
AfterWatermark.pastEndOfWindow().withLateFirings(StubTrigger.named("t1"));
-
-    assertEquals("AfterWatermark.pastEndOfWindow().withLateFirings(t1)", 
trigger.toString());
-  }
-
-  @Test
-  public void testEarlyAndLateFiringsToString() {
-    Trigger trigger =
-        AfterWatermark.pastEndOfWindow()
-            .withEarlyFirings(StubTrigger.named("t1"))
-            .withLateFirings(StubTrigger.named("t2"));
-
-    
assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1).withLateFirings(t2)",
-        trigger.toString());
-  }
-
-  @Test
-  public void testToStringExcludesNeverTrigger() {
-    Trigger trigger =
-        AfterWatermark.pastEndOfWindow()
-            .withEarlyFirings(Never.ever())
-            .withLateFirings(Never.ever());
-
-    assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java
deleted file mode 100644
index 673e555..0000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests the {@link DefaultTrigger}, which should be equivalent to
- * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
- */
-@RunWith(JUnit4.class)
-public class DefaultTriggerTest {
-
-  SimpleTriggerTester<IntervalWindow> tester;
-
-  @Test
-  public void testDefaultTriggerFixedWindows() throws Exception {
-    tester = TriggerTester.forTrigger(
-        DefaultTrigger.of(),
-        FixedWindows.of(Duration.millis(100)));
-
-    tester.injectElements(
-        1, // [0, 100)
-        101); // [100, 200)
-
-    IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new 
Instant(100));
-    IntervalWindow secondWindow = new IntervalWindow(new Instant(100), new 
Instant(200));
-
-    // Advance the watermark almost to the end of the first window.
-    tester.advanceInputWatermark(new Instant(99));
-    assertFalse(tester.shouldFire(firstWindow));
-    assertFalse(tester.shouldFire(secondWindow));
-
-    // Advance watermark past end of the first window, which is then ready
-    tester.advanceInputWatermark(new Instant(100));
-    assertTrue(tester.shouldFire(firstWindow));
-    assertFalse(tester.shouldFire(secondWindow));
-
-    // Fire, but the first window is still allowed to fire
-    tester.fireIfShouldFire(firstWindow);
-    assertTrue(tester.shouldFire(firstWindow));
-    assertFalse(tester.shouldFire(secondWindow));
-
-    // Advance watermark to 200, then both are ready
-    tester.advanceInputWatermark(new Instant(200));
-    assertTrue(tester.shouldFire(firstWindow));
-    assertTrue(tester.shouldFire(secondWindow));
-
-    assertFalse(tester.isMarkedFinished(firstWindow));
-    assertFalse(tester.isMarkedFinished(secondWindow));
-  }
-
-  @Test
-  public void testDefaultTriggerSlidingWindows() throws Exception {
-    tester = TriggerTester.forTrigger(
-        DefaultTrigger.of(),
-        SlidingWindows.of(Duration.millis(100)).every(Duration.millis(50)));
-
-    tester.injectElements(
-        1, // [-50, 50), [0, 100)
-        50); // [0, 100), [50, 150)
-
-    IntervalWindow firstWindow = new IntervalWindow(new Instant(-50), new 
Instant(50));
-    IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new 
Instant(100));
-    IntervalWindow thirdWindow = new IntervalWindow(new Instant(50), new 
Instant(150));
-
-    assertFalse(tester.shouldFire(firstWindow));
-    assertFalse(tester.shouldFire(secondWindow));
-    assertFalse(tester.shouldFire(thirdWindow));
-
-    // At 50, the first becomes ready; it stays ready after firing
-    tester.advanceInputWatermark(new Instant(50));
-    assertTrue(tester.shouldFire(firstWindow));
-    assertFalse(tester.shouldFire(secondWindow));
-    assertFalse(tester.shouldFire(thirdWindow));
-    tester.fireIfShouldFire(firstWindow);
-    assertTrue(tester.shouldFire(firstWindow));
-    assertFalse(tester.shouldFire(secondWindow));
-    assertFalse(tester.shouldFire(thirdWindow));
-
-    // At 99, the first is still the only one ready
-    tester.advanceInputWatermark(new Instant(99));
-    assertTrue(tester.shouldFire(firstWindow));
-    assertFalse(tester.shouldFire(secondWindow));
-    assertFalse(tester.shouldFire(thirdWindow));
-
-    // At 100, the first and second are ready
-    tester.advanceInputWatermark(new Instant(100));
-    assertTrue(tester.shouldFire(firstWindow));
-    assertTrue(tester.shouldFire(secondWindow));
-    assertFalse(tester.shouldFire(thirdWindow));
-    tester.fireIfShouldFire(firstWindow);
-
-    assertFalse(tester.isMarkedFinished(firstWindow));
-    assertFalse(tester.isMarkedFinished(secondWindow));
-    assertFalse(tester.isMarkedFinished(thirdWindow));
-  }
-
-  @Test
-  public void testDefaultTriggerSessions() throws Exception {
-    tester = TriggerTester.forTrigger(
-        DefaultTrigger.of(),
-        Sessions.withGapDuration(Duration.millis(100)));
-
-    tester.injectElements(
-        1, // [1, 101)
-        50); // [50, 150)
-    tester.mergeWindows();
-
-    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(101));
-    IntervalWindow secondWindow = new IntervalWindow(new Instant(50), new 
Instant(150));
-    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(150));
-
-    // Not ready in any window yet
-    tester.advanceInputWatermark(new Instant(100));
-    assertFalse(tester.shouldFire(firstWindow));
-    assertFalse(tester.shouldFire(secondWindow));
-    assertFalse(tester.shouldFire(mergedWindow));
-
-    // The first window is "ready": the caller owns knowledge of which windows 
are merged away
-    tester.advanceInputWatermark(new Instant(149));
-    assertTrue(tester.shouldFire(firstWindow));
-    assertFalse(tester.shouldFire(secondWindow));
-    assertFalse(tester.shouldFire(mergedWindow));
-
-    // Now ready on all windows
-    tester.advanceInputWatermark(new Instant(150));
-    assertTrue(tester.shouldFire(firstWindow));
-    assertTrue(tester.shouldFire(secondWindow));
-    assertTrue(tester.shouldFire(mergedWindow));
-
-    // Ensure it repeats
-    tester.fireIfShouldFire(mergedWindow);
-    assertTrue(tester.shouldFire(mergedWindow));
-
-    assertFalse(tester.isMarkedFinished(mergedWindow));
-  }
-
-  @Test
-  public void testFireDeadline() throws Exception {
-    assertEquals(new Instant(9), 
DefaultTrigger.of().getWatermarkThatGuaranteesFiring(
-        new IntervalWindow(new Instant(0), new Instant(10))));
-    assertEquals(GlobalWindow.INSTANCE.maxTimestamp(),
-        
DefaultTrigger.of().getWatermarkThatGuaranteesFiring(GlobalWindow.INSTANCE));
-  }
-
-  @Test
-  public void testContinuation() throws Exception {
-    assertEquals(DefaultTrigger.of(), 
DefaultTrigger.of().getContinuationTrigger());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java
deleted file mode 100644
index 1e3a1ff..0000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ExecutableTrigger}.
- */
-@RunWith(JUnit4.class)
-public class ExecutableTriggerTest {
-
-  @Test
-  public void testIndexAssignmentLeaf() throws Exception {
-    StubTrigger t1 = new StubTrigger();
-    ExecutableTrigger executable = ExecutableTrigger.create(t1);
-    assertEquals(0, executable.getTriggerIndex());
-  }
-
-  @Test
-  public void testIndexAssignmentOneLevel() throws Exception {
-    StubTrigger t1 = new StubTrigger();
-    StubTrigger t2 = new StubTrigger();
-    StubTrigger t = new StubTrigger(t1, t2);
-
-    ExecutableTrigger executable = ExecutableTrigger.create(t);
-
-    assertEquals(0, executable.getTriggerIndex());
-    assertEquals(1, executable.subTriggers().get(0).getTriggerIndex());
-    assertSame(t1, executable.subTriggers().get(0).getSpec());
-    assertEquals(2, executable.subTriggers().get(1).getTriggerIndex());
-    assertSame(t2, executable.subTriggers().get(1).getSpec());
-  }
-
-  @Test
-  public void testIndexAssignmentTwoLevel() throws Exception {
-    StubTrigger t11 = new StubTrigger();
-    StubTrigger t12 = new StubTrigger();
-    StubTrigger t13 = new StubTrigger();
-    StubTrigger t14 = new StubTrigger();
-    StubTrigger t21 = new StubTrigger();
-    StubTrigger t22 = new StubTrigger();
-    StubTrigger t1 = new StubTrigger(t11, t12, t13, t14);
-    StubTrigger t2 = new StubTrigger(t21, t22);
-    StubTrigger t = new StubTrigger(t1, t2);
-
-    ExecutableTrigger executable = ExecutableTrigger.create(t);
-
-    assertEquals(0, executable.getTriggerIndex());
-    assertEquals(1, executable.subTriggers().get(0).getTriggerIndex());
-    assertEquals(6, 
executable.subTriggers().get(0).getFirstIndexAfterSubtree());
-    assertEquals(6, executable.subTriggers().get(1).getTriggerIndex());
-
-    assertSame(t1, executable.getSubTriggerContaining(1).getSpec());
-    assertSame(t2, executable.getSubTriggerContaining(6).getSpec());
-    assertSame(t1, executable.getSubTriggerContaining(2).getSpec());
-    assertSame(t1, executable.getSubTriggerContaining(3).getSpec());
-    assertSame(t1, executable.getSubTriggerContaining(5).getSpec());
-    assertSame(t2, executable.getSubTriggerContaining(7).getSpec());
-  }
-
-  private static class StubTrigger extends Trigger {
-
-    @SafeVarargs
-    protected StubTrigger(Trigger... subTriggers) {
-      super(Arrays.asList(subTriggers));
-    }
-
-    @Override
-    public void onElement(OnElementContext c) throws Exception { }
-
-    @Override
-    public void onMerge(OnMergeContext c) throws Exception { }
-
-    @Override
-    public void clear(TriggerContext c) throws Exception {
-    }
-
-    @Override
-    public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-      return BoundedWindow.TIMESTAMP_MAX_VALUE;
-    }
-
-    @Override
-    public boolean isCompatible(Trigger other) {
-      return false;
-    }
-
-    @Override
-    public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
-      return this;
-    }
-
-    @Override
-    public boolean shouldFire(TriggerContext c) {
-      return false;
-    }
-
-    @Override
-    public void onFire(TriggerContext c) { }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java
deleted file mode 100644
index 7f74620..0000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.theInstance;
-import static org.junit.Assert.assertThat;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link FinishedTriggersBitSet}.
- */
-@RunWith(JUnit4.class)
-public class FinishedTriggersBitSetTest {
-  /**
-   * Tests that after a trigger is set to finished, it reads back as finished.
-   */
-  @Test
-  public void testSetGet() {
-    
FinishedTriggersProperties.verifyGetAfterSet(FinishedTriggersBitSet.emptyWithCapacity(1));
-  }
-
-  /**
-   * Tests that clearing a trigger recursively clears all of that triggers 
subTriggers, but no
-   * others.
-   */
-  @Test
-  public void testClearRecursively() {
-    
FinishedTriggersProperties.verifyClearRecursively(FinishedTriggersBitSet.emptyWithCapacity(1));
-  }
-
-  @Test
-  public void testCopy() throws Exception {
-    FinishedTriggersBitSet finishedSet = 
FinishedTriggersBitSet.emptyWithCapacity(10);
-    assertThat(finishedSet.copy().getBitSet(), 
not(theInstance(finishedSet.getBitSet())));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java
deleted file mode 100644
index a66f74f..0000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.windowing.AfterAll;
-import org.apache.beam.sdk.transforms.windowing.AfterFirst;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-
-/**
- * Generalized tests for {@link FinishedTriggers} implementations.
- */
-public class FinishedTriggersProperties {
-  /**
-   * Tests that for the provided trigger and {@link FinishedTriggers}, when 
the trigger is set
-   * finished, it is correctly reported as finished.
-   */
-  public static void verifyGetAfterSet(FinishedTriggers finishedSet, 
ExecutableTrigger trigger) {
-    assertFalse(finishedSet.isFinished(trigger));
-    finishedSet.setFinished(trigger, true);
-    assertTrue(finishedSet.isFinished(trigger));
-  }
-
-  /**
-   * For a few arbitrary triggers, tests that when the trigger is set finished 
it is correctly
-   * reported as finished.
-   */
-  public static void verifyGetAfterSet(FinishedTriggers finishedSet) {
-    ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of(
-        AfterFirst.of(AfterPane.elementCountAtLeast(3), 
AfterWatermark.pastEndOfWindow()),
-        AfterAll.of(
-            AfterPane.elementCountAtLeast(10), 
AfterProcessingTime.pastFirstElementInPane())));
-
-    verifyGetAfterSet(finishedSet, trigger);
-    verifyGetAfterSet(finishedSet, 
trigger.subTriggers().get(0).subTriggers().get(1));
-    verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0));
-    verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1));
-    verifyGetAfterSet(finishedSet, 
trigger.subTriggers().get(1).subTriggers().get(1));
-    verifyGetAfterSet(finishedSet, 
trigger.subTriggers().get(1).subTriggers().get(0));
-  }
-
-  /**
-   * Tests that clearing a trigger recursively clears all of that triggers 
subTriggers, but no
-   * others.
-   */
-  public static void verifyClearRecursively(FinishedTriggers finishedSet) {
-    ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of(
-        AfterFirst.of(AfterPane.elementCountAtLeast(3), 
AfterWatermark.pastEndOfWindow()),
-        AfterAll.of(
-            AfterPane.elementCountAtLeast(10), 
AfterProcessingTime.pastFirstElementInPane())));
-
-    // Set them all finished. This method is not on a trigger as it makes no 
sense outside tests.
-    setFinishedRecursively(finishedSet, trigger);
-    assertTrue(finishedSet.isFinished(trigger));
-    assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0)));
-    
assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(0)));
-    
assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(1)));
-
-    // Clear just the second AfterAll
-    finishedSet.clearRecursively(trigger.subTriggers().get(1));
-
-    // Check that the first and all that are still finished
-    assertTrue(finishedSet.isFinished(trigger));
-    verifyFinishedRecursively(finishedSet, trigger.subTriggers().get(0));
-    verifyUnfinishedRecursively(finishedSet, trigger.subTriggers().get(1));
-  }
-
-  private static void setFinishedRecursively(
-      FinishedTriggers finishedSet, ExecutableTrigger trigger) {
-    finishedSet.setFinished(trigger, true);
-    for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
-      setFinishedRecursively(finishedSet, subTrigger);
-    }
-  }
-
-  private static void verifyFinishedRecursively(
-      FinishedTriggers finishedSet, ExecutableTrigger trigger) {
-    assertTrue(finishedSet.isFinished(trigger));
-    for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
-      verifyFinishedRecursively(finishedSet, subTrigger);
-    }
-  }
-
-  private static void verifyUnfinishedRecursively(
-      FinishedTriggers finishedSet, ExecutableTrigger trigger) {
-    assertFalse(finishedSet.isFinished(trigger));
-    for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
-      verifyUnfinishedRecursively(finishedSet, subTrigger);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java
deleted file mode 100644
index 072d264..0000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.theInstance;
-import static org.junit.Assert.assertThat;
-
-import java.util.HashSet;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link FinishedTriggersSet}.
- */
-@RunWith(JUnit4.class)
-public class FinishedTriggersSetTest {
-  /**
-   * Tests that after a trigger is set to finished, it reads back as finished.
-   */
-  @Test
-  public void testSetGet() {
-    FinishedTriggersProperties.verifyGetAfterSet(
-        FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()));
-  }
-
-  /**
-   * Tests that clearing a trigger recursively clears all of that triggers 
subTriggers, but no
-   * others.
-   */
-  @Test
-  public void testClearRecursively() {
-    FinishedTriggersProperties.verifyClearRecursively(
-        FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()));
-  }
-
-  @Test
-  public void testCopy() throws Exception {
-    FinishedTriggersSet finishedSet =
-        FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>());
-    assertThat(finishedSet.copy().getFinishedTriggers(),
-        not(theInstance(finishedSet.getFinishedTriggers())));
-  }
-}


Reply via email to