http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java
new file mode 100644
index 0000000..38d030e
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link AfterPane}.
+ */
+@RunWith(JUnit4.class)
+public class AfterPaneTest {
+
+  SimpleTriggerTester<IntervalWindow> tester;
+  /**
+   * Tests that the trigger does fire when enough elements are in a window, 
and that it only
+   * fires that window (no leakage).
+   */
+  @Test
+  public void testAfterPaneElementCountFixedWindows() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterPane.elementCountAtLeast(2),
+        FixedWindows.of(Duration.millis(10)));
+
+    tester.injectElements(1); // [0, 10)
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(10));
+    assertFalse(tester.shouldFire(window));
+
+    tester.injectElements(2); // [0, 10)
+    tester.injectElements(11); // [10, 20)
+
+    assertTrue(tester.shouldFire(window)); // ready to fire
+    tester.fireIfShouldFire(window); // and finished
+    assertTrue(tester.isMarkedFinished(window));
+
+    // But don't finish the other window
+    assertFalse(tester.isMarkedFinished(new IntervalWindow(new Instant(10), 
new Instant(20))));
+  }
+
+  @Test
+  public void testClear() throws Exception {
+    SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
+        AfterPane.elementCountAtLeast(2),
+        FixedWindows.of(Duration.millis(10)));
+
+    tester.injectElements(1, 2, 3);
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(10));
+    tester.clearState(window);
+    tester.assertCleared(window);
+  }
+
+  @Test
+  public void testAfterPaneElementCountSessions() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterPane.elementCountAtLeast(2),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    tester.injectElements(
+        1, // in [1, 11)
+        2); // in [2, 12)
+
+    assertFalse(tester.shouldFire(new IntervalWindow(new Instant(1), new 
Instant(11))));
+    assertFalse(tester.shouldFire(new IntervalWindow(new Instant(2), new 
Instant(12))));
+
+    tester.mergeWindows();
+
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(12));
+    assertTrue(tester.shouldFire(mergedWindow));
+    tester.fireIfShouldFire(mergedWindow);
+    assertTrue(tester.isMarkedFinished(mergedWindow));
+
+    // Because we closed the previous window, we don't have it around to merge 
with. So there
+    // will be a new FIRE_AND_FINISH result.
+    tester.injectElements(
+        7,  // in [7, 17)
+        9); // in [9, 19)
+
+    tester.mergeWindows();
+
+    IntervalWindow newMergedWindow = new IntervalWindow(new Instant(7), new 
Instant(19));
+    assertTrue(tester.shouldFire(newMergedWindow));
+    tester.fireIfShouldFire(newMergedWindow);
+    assertTrue(tester.isMarkedFinished(newMergedWindow));
+  }
+
+  @Test
+  public void testFireDeadline() throws Exception {
+    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
+        AfterPane.elementCountAtLeast(1).getWatermarkThatGuaranteesFiring(
+            new IntervalWindow(new Instant(0), new Instant(10))));
+  }
+
+  @Test
+  public void testContinuation() throws Exception {
+    assertEquals(
+        AfterPane.elementCountAtLeast(1),
+        AfterPane.elementCountAtLeast(100).getContinuationTrigger());
+    assertEquals(
+        AfterPane.elementCountAtLeast(1),
+        
AfterPane.elementCountAtLeast(100).getContinuationTrigger().getContinuationTrigger());
+  }
+
+  @Test
+  public void testToString() {
+    Trigger trigger = AfterPane.elementCountAtLeast(5);
+    assertEquals("AfterPane.elementCountAtLeast(5)", trigger.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java
new file mode 100644
index 0000000..13a7acf
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests the {@link AfterProcessingTime}.
+ */
+@RunWith(JUnit4.class)
+public class AfterProcessingTimeTest {
+
+  /**
+   * Tests the basic property that the trigger does wait for processing time 
to be
+   * far enough advanced.
+   */
+  @Test
+  public void testAfterProcessingTimeFixedWindows() throws Exception {
+    Duration windowDuration = Duration.millis(10);
+    SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
+        AfterProcessingTime
+            .pastFirstElementInPane()
+            .plusDelayOf(Duration.millis(5)),
+        FixedWindows.of(windowDuration));
+
+    tester.advanceProcessingTime(new Instant(10));
+
+    // Timer at 15
+    tester.injectElements(1);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new 
Instant(10));
+    tester.advanceProcessingTime(new Instant(12));
+    assertFalse(tester.shouldFire(firstWindow));
+
+    // Load up elements in the next window, timer at 17 for them
+    tester.injectElements(11, 12, 13);
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new 
Instant(20));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Not quite time to fire
+    tester.advanceProcessingTime(new Instant(14));
+    assertFalse(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Timer at 19 for these in the first window; it should be ignored since 
the 15 will fire first
+    tester.injectElements(2, 3);
+
+    // Advance past the first timer and fire, finishing the first window
+    tester.advanceProcessingTime(new Instant(16));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+    assertTrue(tester.isMarkedFinished(firstWindow));
+
+    // The next window fires and finishes now
+    tester.advanceProcessingTime(new Instant(18));
+    assertTrue(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(secondWindow);
+    assertTrue(tester.isMarkedFinished(secondWindow));
+  }
+
+  /**
+   * Tests that when windows merge, if the trigger is waiting for "N millis 
after the first
+   * element" that it is relative to the earlier of the two merged windows.
+   */
+  @Test
+  public void testClear() throws Exception {
+    SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
+        AfterProcessingTime
+            .pastFirstElementInPane()
+            .plusDelayOf(Duration.millis(5)),
+        FixedWindows.of(Duration.millis(10)));
+
+    tester.injectElements(1, 2, 3);
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(10));
+    tester.clearState(window);
+    tester.assertCleared(window);
+  }
+
+  @Test
+  public void testAfterProcessingTimeWithMergingWindow() throws Exception {
+    SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
+        AfterProcessingTime
+            .pastFirstElementInPane()
+            .plusDelayOf(Duration.millis(5)),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    tester.advanceProcessingTime(new Instant(10));
+    tester.injectElements(1); // in [1, 11), timer for 15
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    assertFalse(tester.shouldFire(firstWindow));
+
+    tester.advanceProcessingTime(new Instant(12));
+    tester.injectElements(3); // in [3, 13), timer for 17
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new 
Instant(13));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    tester.mergeWindows();
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(13));
+
+    tester.advanceProcessingTime(new Instant(16));
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  @Test
+  public void testFireDeadline() throws Exception {
+    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
+        
AfterProcessingTime.pastFirstElementInPane().getWatermarkThatGuaranteesFiring(
+            new IntervalWindow(new Instant(0), new Instant(10))));
+  }
+
+  @Test
+  public void testContinuation() throws Exception {
+    OnceTrigger firstElementPlus1 =
+        
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1));
+    assertEquals(
+        new AfterSynchronizedProcessingTime(),
+        firstElementPlus1.getContinuationTrigger());
+  }
+
+  /**
+   * Basic test of compatibility check between identical triggers.
+   */
+  @Test
+  public void testCompatibilityIdentical() throws Exception {
+    Trigger t1 = AfterProcessingTime.pastFirstElementInPane()
+            .plusDelayOf(Duration.standardMinutes(1L));
+    Trigger t2 = AfterProcessingTime.pastFirstElementInPane()
+            .plusDelayOf(Duration.standardMinutes(1L));
+    assertTrue(t1.isCompatible(t2));
+  }
+
+  @Test
+  public void testToString() {
+    Trigger trigger = AfterProcessingTime.pastFirstElementInPane();
+    assertEquals("AfterProcessingTime.pastFirstElementInPane()", 
trigger.toString());
+  }
+
+  @Test
+  public void testWithDelayToString() {
+    Trigger trigger = AfterProcessingTime.pastFirstElementInPane()
+        .plusDelayOf(Duration.standardMinutes(5));
+
+    assertEquals("AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5 
minutes)",
+        trigger.toString());
+  }
+
+  @Test
+  public void testBuiltUpToString() {
+    Trigger trigger = AfterWatermark.pastEndOfWindow()
+        .withLateFirings(AfterProcessingTime
+            .pastFirstElementInPane()
+            .plusDelayOf(Duration.standardMinutes(10)));
+
+    String expected = "AfterWatermark.pastEndOfWindow()"
+        + ".withLateFirings(AfterProcessingTime"
+        + ".pastFirstElementInPane()"
+        + ".plusDelayOf(10 minutes))";
+
+    assertEquals(expected, trigger.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java
new file mode 100644
index 0000000..7e6e938
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests the {@link AfterSynchronizedProcessingTime}.
+ */
+@RunWith(JUnit4.class)
+public class AfterSynchronizedProcessingTimeTest {
+
+  private Trigger underTest = new AfterSynchronizedProcessingTime();
+
+  @Test
+  public void testAfterProcessingTimeWithFixedWindows() throws Exception {
+    Duration windowDuration = Duration.millis(10);
+    SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
+        AfterProcessingTime
+            .pastFirstElementInPane()
+            .plusDelayOf(Duration.millis(5)),
+        FixedWindows.of(windowDuration));
+
+    tester.advanceProcessingTime(new Instant(10));
+
+    // Timer at 15
+    tester.injectElements(1);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new 
Instant(10));
+    tester.advanceProcessingTime(new Instant(12));
+    assertFalse(tester.shouldFire(firstWindow));
+
+    // Load up elements in the next window, timer at 17 for them
+    tester.injectElements(11, 12, 13);
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new 
Instant(20));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Not quite time to fire
+    tester.advanceProcessingTime(new Instant(14));
+    assertFalse(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Timer at 19 for these in the first window; it should be ignored since 
the 15 will fire first
+    tester.injectElements(2, 3);
+
+    // Advance past the first timer and fire, finishing the first window
+    tester.advanceProcessingTime(new Instant(16));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+    assertTrue(tester.isMarkedFinished(firstWindow));
+
+    // The next window fires and finishes now
+    tester.advanceProcessingTime(new Instant(18));
+    assertTrue(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(secondWindow);
+    assertTrue(tester.isMarkedFinished(secondWindow));
+  }
+
+  @Test
+  public void testAfterProcessingTimeWithMergingWindow() throws Exception {
+    Duration windowDuration = Duration.millis(10);
+    SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
+        AfterProcessingTime
+            .pastFirstElementInPane()
+            .plusDelayOf(Duration.millis(5)),
+        Sessions.withGapDuration(windowDuration));
+
+    tester.advanceProcessingTime(new Instant(10));
+    tester.injectElements(1); // in [1, 11), timer for 15
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    assertFalse(tester.shouldFire(firstWindow));
+
+    tester.advanceProcessingTime(new Instant(12));
+    tester.injectElements(3); // in [3, 13), timer for 17
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new 
Instant(13));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    tester.mergeWindows();
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(13));
+
+    tester.advanceProcessingTime(new Instant(16));
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  @Test
+  public void testFireDeadline() throws Exception {
+    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
+        underTest.getWatermarkThatGuaranteesFiring(
+            new IntervalWindow(new Instant(0), new Instant(10))));
+  }
+
+  @Test
+  public void testContinuation() throws Exception {
+    assertEquals(underTest, underTest.getContinuationTrigger());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java
new file mode 100644
index 0000000..084027b
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests the {@link AfterWatermark} triggers.
+ */
+@RunWith(JUnit4.class)
+public class AfterWatermarkTest {
+
+  @Mock private OnceTrigger mockEarly;
+  @Mock private OnceTrigger mockLate;
+
+  private SimpleTriggerTester<IntervalWindow> tester;
+  private static Trigger.TriggerContext anyTriggerContext() {
+    return Mockito.<Trigger.TriggerContext>any();
+  }
+  private static Trigger.OnElementContext anyElementContext() {
+    return Mockito.<Trigger.OnElementContext>any();
+  }
+
+  private void injectElements(int... elements) throws Exception {
+    for (int element : elements) {
+      doNothing().when(mockEarly).onElement(anyElementContext());
+      doNothing().when(mockLate).onElement(anyElementContext());
+      tester.injectElements(element);
+    }
+  }
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  public void testRunningAsTrigger(OnceTrigger mockTrigger, IntervalWindow 
window)
+      throws Exception {
+
+    // Don't fire due to mock saying no
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+    assertFalse(tester.shouldFire(window)); // not ready
+
+    // Fire due to mock trigger; early trigger is required to be a OnceTrigger
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    assertTrue(tester.shouldFire(window)); // ready
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+  }
+
+  @Test
+  public void testEarlyAndAtWatermark() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterWatermark.pastEndOfWindow()
+            .withEarlyFirings(mockEarly),
+        FixedWindows.of(Duration.millis(100)));
+
+    injectElements(1);
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
+
+    testRunningAsTrigger(mockEarly, window);
+
+    // Fire due to watermark
+    when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
+    tester.advanceInputWatermark(new Instant(100));
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertTrue(tester.isMarkedFinished(window));
+  }
+
+  @Test
+  public void testAtWatermarkAndLate() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterWatermark.pastEndOfWindow()
+            .withLateFirings(mockLate),
+        FixedWindows.of(Duration.millis(100)));
+
+    injectElements(1);
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
+
+    // No early firing, just double checking
+    when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(true);
+    assertFalse(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+
+    // Fire due to watermark
+    when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
+    tester.advanceInputWatermark(new Instant(100));
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+
+    testRunningAsTrigger(mockLate, window);
+  }
+
+  @Test
+  public void testEarlyAndAtWatermarkAndLate() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterWatermark.pastEndOfWindow()
+            .withEarlyFirings(mockEarly)
+            .withLateFirings(mockLate),
+        FixedWindows.of(Duration.millis(100)));
+
+    injectElements(1);
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
+
+    testRunningAsTrigger(mockEarly, window);
+
+    // Fire due to watermark
+    when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
+    tester.advanceInputWatermark(new Instant(100));
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+
+    testRunningAsTrigger(mockLate, window);
+  }
+
+  /**
+   * Tests that if the EOW is finished in both as well as the merged window, 
then
+   * it is finished in the merged result.
+   *
+   * <p>Because windows are discarded when a trigger finishes, we need to 
embed this
+   * in a sequence in order to check that it is re-activated. So this test is 
potentially
+   * sensitive to other triggers' correctness.
+   */
+  @Test
+  public void testOnMergeAlreadyFinished() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterEach.inOrder(
+            AfterWatermark.pastEndOfWindow(),
+            Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    tester.injectElements(1);
+    tester.injectElements(5);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
+
+    // Finish the AfterWatermark.pastEndOfWindow() trigger in both windows
+    tester.advanceInputWatermark(new Instant(15));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertTrue(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+    tester.fireIfShouldFire(secondWindow);
+
+    // Confirm that we are on the second trigger by probing
+    assertFalse(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    tester.injectElements(1);
+    tester.injectElements(5);
+    assertTrue(tester.shouldFire(firstWindow));
+    assertTrue(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+    tester.fireIfShouldFire(secondWindow);
+
+    // Merging should leave it finished
+    tester.mergeWindows();
+
+    // Confirm that we are on the second trigger by probing
+    assertFalse(tester.shouldFire(mergedWindow));
+    tester.injectElements(1);
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  /**
+   * Tests that the trigger rewinds to be non-finished in the merged window.
+   *
+   * <p>Because windows are discarded when a trigger finishes, we need to 
embed this
+   * in a sequence in order to check that it is re-activated. So this test is 
potentially
+   * sensitive to other triggers' correctness.
+   */
+  @Test
+  public void testOnMergeRewinds() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterEach.inOrder(
+            AfterWatermark.pastEndOfWindow(),
+            Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    tester.injectElements(1);
+    tester.injectElements(5);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
+
+    // Finish the AfterWatermark.pastEndOfWindow() trigger in only the first 
window
+    tester.advanceInputWatermark(new Instant(11));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+
+    // Confirm that we are on the second trigger by probing
+    assertFalse(tester.shouldFire(firstWindow));
+    tester.injectElements(1);
+    assertTrue(tester.shouldFire(firstWindow));
+    tester.fireIfShouldFire(firstWindow);
+
+    // Merging should re-activate the watermark trigger in the merged window
+    tester.mergeWindows();
+
+    // Confirm that we are not on the second trigger by probing
+    assertFalse(tester.shouldFire(mergedWindow));
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(mergedWindow));
+
+    // And confirm that advancing the watermark fires again
+    tester.advanceInputWatermark(new Instant(15));
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  /**
+   * Tests that if the EOW is finished in both as well as the merged window, 
then
+   * it is finished in the merged result.
+   *
+   * <p>Because windows are discarded when a trigger finishes, we need to 
embed this
+   * in a sequence in order to check that it is re-activated. So this test is 
potentially
+   * sensitive to other triggers' correctness.
+   */
+  @Test
+  public void testEarlyAndLateOnMergeAlreadyFinished() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterWatermark.pastEndOfWindow()
+            .withEarlyFirings(AfterPane.elementCountAtLeast(100))
+            .withLateFirings(AfterPane.elementCountAtLeast(1)),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    tester.injectElements(1);
+    tester.injectElements(5);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
+
+    // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in both 
windows
+    tester.advanceInputWatermark(new Instant(15));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertTrue(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+    tester.fireIfShouldFire(secondWindow);
+
+    // Confirm that we are on the late trigger by probing
+    assertFalse(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    tester.injectElements(1);
+    tester.injectElements(5);
+    assertTrue(tester.shouldFire(firstWindow));
+    assertTrue(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+    tester.fireIfShouldFire(secondWindow);
+
+    // Merging should leave it on the late trigger
+    tester.mergeWindows();
+
+    // Confirm that we are on the late trigger by probing
+    assertFalse(tester.shouldFire(mergedWindow));
+    tester.injectElements(1);
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  /**
+   * Tests that the trigger rewinds to be non-finished in the merged window.
+   *
+   * <p>Because windows are discarded when a trigger finishes, we need to 
embed this
+   * in a sequence in order to check that it is re-activated. So this test is 
potentially
+   * sensitive to other triggers' correctness.
+   */
+  @Test
+  public void testEarlyAndLateOnMergeRewinds() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterWatermark.pastEndOfWindow()
+            .withEarlyFirings(AfterPane.elementCountAtLeast(100))
+            .withLateFirings(AfterPane.elementCountAtLeast(1)),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    tester.injectElements(1);
+    tester.injectElements(5);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
+
+    // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in only 
the first window
+    tester.advanceInputWatermark(new Instant(11));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+
+    // Confirm that we are on the late trigger by probing
+    assertFalse(tester.shouldFire(firstWindow));
+    tester.injectElements(1);
+    assertTrue(tester.shouldFire(firstWindow));
+    tester.fireIfShouldFire(firstWindow);
+
+    // Merging should re-activate the early trigger in the merged window
+    tester.mergeWindows();
+
+    // Confirm that we are not on the second trigger by probing
+    assertFalse(tester.shouldFire(mergedWindow));
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(mergedWindow));
+
+    // And confirm that advancing the watermark fires again
+    tester.advanceInputWatermark(new Instant(15));
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  @Test
+  public void testFromEndOfWindowToString() {
+    Trigger trigger = AfterWatermark.pastEndOfWindow();
+    assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString());
+  }
+
+  @Test
+  public void testEarlyFiringsToString() {
+    Trigger trigger = 
AfterWatermark.pastEndOfWindow().withEarlyFirings(StubTrigger.named("t1"));
+
+    assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1)", 
trigger.toString());
+  }
+
+  @Test
+  public void testLateFiringsToString() {
+    Trigger trigger = 
AfterWatermark.pastEndOfWindow().withLateFirings(StubTrigger.named("t1"));
+
+    assertEquals("AfterWatermark.pastEndOfWindow().withLateFirings(t1)", 
trigger.toString());
+  }
+
+  @Test
+  public void testEarlyAndLateFiringsToString() {
+    Trigger trigger =
+        AfterWatermark.pastEndOfWindow()
+            .withEarlyFirings(StubTrigger.named("t1"))
+            .withLateFirings(StubTrigger.named("t2"));
+
+    
assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1).withLateFirings(t2)",
+        trigger.toString());
+  }
+
+  @Test
+  public void testToStringExcludesNeverTrigger() {
+    Trigger trigger =
+        AfterWatermark.pastEndOfWindow()
+            .withEarlyFirings(Never.ever())
+            .withLateFirings(Never.ever());
+
+    assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java
new file mode 100644
index 0000000..673e555
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests the {@link DefaultTrigger}, which should be equivalent to
+ * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
+ */
+@RunWith(JUnit4.class)
+public class DefaultTriggerTest {
+
+  SimpleTriggerTester<IntervalWindow> tester;
+
+  @Test
+  public void testDefaultTriggerFixedWindows() throws Exception {
+    tester = TriggerTester.forTrigger(
+        DefaultTrigger.of(),
+        FixedWindows.of(Duration.millis(100)));
+
+    tester.injectElements(
+        1, // [0, 100)
+        101); // [100, 200)
+
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new 
Instant(100));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(100), new 
Instant(200));
+
+    // Advance the watermark almost to the end of the first window.
+    tester.advanceInputWatermark(new Instant(99));
+    assertFalse(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Advance watermark past end of the first window, which is then ready
+    tester.advanceInputWatermark(new Instant(100));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Fire, but the first window is still allowed to fire
+    tester.fireIfShouldFire(firstWindow);
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Advance watermark to 200, then both are ready
+    tester.advanceInputWatermark(new Instant(200));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertTrue(tester.shouldFire(secondWindow));
+
+    assertFalse(tester.isMarkedFinished(firstWindow));
+    assertFalse(tester.isMarkedFinished(secondWindow));
+  }
+
+  @Test
+  public void testDefaultTriggerSlidingWindows() throws Exception {
+    tester = TriggerTester.forTrigger(
+        DefaultTrigger.of(),
+        SlidingWindows.of(Duration.millis(100)).every(Duration.millis(50)));
+
+    tester.injectElements(
+        1, // [-50, 50), [0, 100)
+        50); // [0, 100), [50, 150)
+
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(-50), new 
Instant(50));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new 
Instant(100));
+    IntervalWindow thirdWindow = new IntervalWindow(new Instant(50), new 
Instant(150));
+
+    assertFalse(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    assertFalse(tester.shouldFire(thirdWindow));
+
+    // At 50, the first becomes ready; it stays ready after firing
+    tester.advanceInputWatermark(new Instant(50));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    assertFalse(tester.shouldFire(thirdWindow));
+    tester.fireIfShouldFire(firstWindow);
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    assertFalse(tester.shouldFire(thirdWindow));
+
+    // At 99, the first is still the only one ready
+    tester.advanceInputWatermark(new Instant(99));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    assertFalse(tester.shouldFire(thirdWindow));
+
+    // At 100, the first and second are ready
+    tester.advanceInputWatermark(new Instant(100));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertTrue(tester.shouldFire(secondWindow));
+    assertFalse(tester.shouldFire(thirdWindow));
+    tester.fireIfShouldFire(firstWindow);
+
+    assertFalse(tester.isMarkedFinished(firstWindow));
+    assertFalse(tester.isMarkedFinished(secondWindow));
+    assertFalse(tester.isMarkedFinished(thirdWindow));
+  }
+
+  @Test
+  public void testDefaultTriggerSessions() throws Exception {
+    tester = TriggerTester.forTrigger(
+        DefaultTrigger.of(),
+        Sessions.withGapDuration(Duration.millis(100)));
+
+    tester.injectElements(
+        1, // [1, 101)
+        50); // [50, 150)
+    tester.mergeWindows();
+
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(101));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(50), new 
Instant(150));
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(150));
+
+    // Not ready in any window yet
+    tester.advanceInputWatermark(new Instant(100));
+    assertFalse(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    assertFalse(tester.shouldFire(mergedWindow));
+
+    // The first window is "ready": the caller owns knowledge of which windows 
are merged away
+    tester.advanceInputWatermark(new Instant(149));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    assertFalse(tester.shouldFire(mergedWindow));
+
+    // Now ready on all windows
+    tester.advanceInputWatermark(new Instant(150));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertTrue(tester.shouldFire(secondWindow));
+    assertTrue(tester.shouldFire(mergedWindow));
+
+    // Ensure it repeats
+    tester.fireIfShouldFire(mergedWindow);
+    assertTrue(tester.shouldFire(mergedWindow));
+
+    assertFalse(tester.isMarkedFinished(mergedWindow));
+  }
+
+  @Test
+  public void testFireDeadline() throws Exception {
+    assertEquals(new Instant(9), 
DefaultTrigger.of().getWatermarkThatGuaranteesFiring(
+        new IntervalWindow(new Instant(0), new Instant(10))));
+    assertEquals(GlobalWindow.INSTANCE.maxTimestamp(),
+        
DefaultTrigger.of().getWatermarkThatGuaranteesFiring(GlobalWindow.INSTANCE));
+  }
+
+  @Test
+  public void testContinuation() throws Exception {
+    assertEquals(DefaultTrigger.of(), 
DefaultTrigger.of().getContinuationTrigger());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java
new file mode 100644
index 0000000..1e3a1ff
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ExecutableTrigger}.
+ */
+@RunWith(JUnit4.class)
+public class ExecutableTriggerTest {
+
+  @Test
+  public void testIndexAssignmentLeaf() throws Exception {
+    StubTrigger t1 = new StubTrigger();
+    ExecutableTrigger executable = ExecutableTrigger.create(t1);
+    assertEquals(0, executable.getTriggerIndex());
+  }
+
+  @Test
+  public void testIndexAssignmentOneLevel() throws Exception {
+    StubTrigger t1 = new StubTrigger();
+    StubTrigger t2 = new StubTrigger();
+    StubTrigger t = new StubTrigger(t1, t2);
+
+    ExecutableTrigger executable = ExecutableTrigger.create(t);
+
+    assertEquals(0, executable.getTriggerIndex());
+    assertEquals(1, executable.subTriggers().get(0).getTriggerIndex());
+    assertSame(t1, executable.subTriggers().get(0).getSpec());
+    assertEquals(2, executable.subTriggers().get(1).getTriggerIndex());
+    assertSame(t2, executable.subTriggers().get(1).getSpec());
+  }
+
+  @Test
+  public void testIndexAssignmentTwoLevel() throws Exception {
+    StubTrigger t11 = new StubTrigger();
+    StubTrigger t12 = new StubTrigger();
+    StubTrigger t13 = new StubTrigger();
+    StubTrigger t14 = new StubTrigger();
+    StubTrigger t21 = new StubTrigger();
+    StubTrigger t22 = new StubTrigger();
+    StubTrigger t1 = new StubTrigger(t11, t12, t13, t14);
+    StubTrigger t2 = new StubTrigger(t21, t22);
+    StubTrigger t = new StubTrigger(t1, t2);
+
+    ExecutableTrigger executable = ExecutableTrigger.create(t);
+
+    assertEquals(0, executable.getTriggerIndex());
+    assertEquals(1, executable.subTriggers().get(0).getTriggerIndex());
+    assertEquals(6, 
executable.subTriggers().get(0).getFirstIndexAfterSubtree());
+    assertEquals(6, executable.subTriggers().get(1).getTriggerIndex());
+
+    assertSame(t1, executable.getSubTriggerContaining(1).getSpec());
+    assertSame(t2, executable.getSubTriggerContaining(6).getSpec());
+    assertSame(t1, executable.getSubTriggerContaining(2).getSpec());
+    assertSame(t1, executable.getSubTriggerContaining(3).getSpec());
+    assertSame(t1, executable.getSubTriggerContaining(5).getSpec());
+    assertSame(t2, executable.getSubTriggerContaining(7).getSpec());
+  }
+
+  private static class StubTrigger extends Trigger {
+
+    @SafeVarargs
+    protected StubTrigger(Trigger... subTriggers) {
+      super(Arrays.asList(subTriggers));
+    }
+
+    @Override
+    public void onElement(OnElementContext c) throws Exception { }
+
+    @Override
+    public void onMerge(OnMergeContext c) throws Exception { }
+
+    @Override
+    public void clear(TriggerContext c) throws Exception {
+    }
+
+    @Override
+    public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+      return BoundedWindow.TIMESTAMP_MAX_VALUE;
+    }
+
+    @Override
+    public boolean isCompatible(Trigger other) {
+      return false;
+    }
+
+    @Override
+    public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+      return this;
+    }
+
+    @Override
+    public boolean shouldFire(TriggerContext c) {
+      return false;
+    }
+
+    @Override
+    public void onFire(TriggerContext c) { }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java
new file mode 100644
index 0000000..7f74620
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.theInstance;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link FinishedTriggersBitSet}.
+ */
+@RunWith(JUnit4.class)
+public class FinishedTriggersBitSetTest {
+  /**
+   * Tests that after a trigger is set to finished, it reads back as finished.
+   */
+  @Test
+  public void testSetGet() {
+    
FinishedTriggersProperties.verifyGetAfterSet(FinishedTriggersBitSet.emptyWithCapacity(1));
+  }
+
+  /**
+   * Tests that clearing a trigger recursively clears all of that triggers 
subTriggers, but no
+   * others.
+   */
+  @Test
+  public void testClearRecursively() {
+    
FinishedTriggersProperties.verifyClearRecursively(FinishedTriggersBitSet.emptyWithCapacity(1));
+  }
+
+  @Test
+  public void testCopy() throws Exception {
+    FinishedTriggersBitSet finishedSet = 
FinishedTriggersBitSet.emptyWithCapacity(10);
+    assertThat(finishedSet.copy().getBitSet(), 
not(theInstance(finishedSet.getBitSet())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java
new file mode 100644
index 0000000..a66f74f
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.transforms.windowing.AfterAll;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+
+/**
+ * Generalized tests for {@link FinishedTriggers} implementations.
+ */
+public class FinishedTriggersProperties {
+  /**
+   * Tests that for the provided trigger and {@link FinishedTriggers}, when 
the trigger is set
+   * finished, it is correctly reported as finished.
+   */
+  public static void verifyGetAfterSet(FinishedTriggers finishedSet, 
ExecutableTrigger trigger) {
+    assertFalse(finishedSet.isFinished(trigger));
+    finishedSet.setFinished(trigger, true);
+    assertTrue(finishedSet.isFinished(trigger));
+  }
+
+  /**
+   * For a few arbitrary triggers, tests that when the trigger is set finished 
it is correctly
+   * reported as finished.
+   */
+  public static void verifyGetAfterSet(FinishedTriggers finishedSet) {
+    ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of(
+        AfterFirst.of(AfterPane.elementCountAtLeast(3), 
AfterWatermark.pastEndOfWindow()),
+        AfterAll.of(
+            AfterPane.elementCountAtLeast(10), 
AfterProcessingTime.pastFirstElementInPane())));
+
+    verifyGetAfterSet(finishedSet, trigger);
+    verifyGetAfterSet(finishedSet, 
trigger.subTriggers().get(0).subTriggers().get(1));
+    verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0));
+    verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1));
+    verifyGetAfterSet(finishedSet, 
trigger.subTriggers().get(1).subTriggers().get(1));
+    verifyGetAfterSet(finishedSet, 
trigger.subTriggers().get(1).subTriggers().get(0));
+  }
+
+  /**
+   * Tests that clearing a trigger recursively clears all of that triggers 
subTriggers, but no
+   * others.
+   */
+  public static void verifyClearRecursively(FinishedTriggers finishedSet) {
+    ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of(
+        AfterFirst.of(AfterPane.elementCountAtLeast(3), 
AfterWatermark.pastEndOfWindow()),
+        AfterAll.of(
+            AfterPane.elementCountAtLeast(10), 
AfterProcessingTime.pastFirstElementInPane())));
+
+    // Set them all finished. This method is not on a trigger as it makes no 
sense outside tests.
+    setFinishedRecursively(finishedSet, trigger);
+    assertTrue(finishedSet.isFinished(trigger));
+    assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0)));
+    
assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(0)));
+    
assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(1)));
+
+    // Clear just the second AfterAll
+    finishedSet.clearRecursively(trigger.subTriggers().get(1));
+
+    // Check that the first and all that are still finished
+    assertTrue(finishedSet.isFinished(trigger));
+    verifyFinishedRecursively(finishedSet, trigger.subTriggers().get(0));
+    verifyUnfinishedRecursively(finishedSet, trigger.subTriggers().get(1));
+  }
+
+  private static void setFinishedRecursively(
+      FinishedTriggers finishedSet, ExecutableTrigger trigger) {
+    finishedSet.setFinished(trigger, true);
+    for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
+      setFinishedRecursively(finishedSet, subTrigger);
+    }
+  }
+
+  private static void verifyFinishedRecursively(
+      FinishedTriggers finishedSet, ExecutableTrigger trigger) {
+    assertTrue(finishedSet.isFinished(trigger));
+    for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
+      verifyFinishedRecursively(finishedSet, subTrigger);
+    }
+  }
+
+  private static void verifyUnfinishedRecursively(
+      FinishedTriggers finishedSet, ExecutableTrigger trigger) {
+    assertFalse(finishedSet.isFinished(trigger));
+    for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
+      verifyUnfinishedRecursively(finishedSet, subTrigger);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java
new file mode 100644
index 0000000..072d264
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.theInstance;
+import static org.junit.Assert.assertThat;
+
+import java.util.HashSet;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link FinishedTriggersSet}.
+ */
+@RunWith(JUnit4.class)
+public class FinishedTriggersSetTest {
+  /**
+   * Tests that after a trigger is set to finished, it reads back as finished.
+   */
+  @Test
+  public void testSetGet() {
+    FinishedTriggersProperties.verifyGetAfterSet(
+        FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()));
+  }
+
+  /**
+   * Tests that clearing a trigger recursively clears all of that triggers 
subTriggers, but no
+   * others.
+   */
+  @Test
+  public void testClearRecursively() {
+    FinishedTriggersProperties.verifyClearRecursively(
+        FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()));
+  }
+
+  @Test
+  public void testCopy() throws Exception {
+    FinishedTriggersSet finishedSet =
+        FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>());
+    assertThat(finishedSet.copy().getFinishedTriggers(),
+        not(theInstance(finishedSet.getFinishedTriggers())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java
new file mode 100644
index 0000000..fb2b4d5
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link Never}.
+ */
+@RunWith(JUnit4.class)
+public class NeverTest {
+  private SimpleTriggerTester<IntervalWindow> triggerTester;
+
+  @Before
+  public void setup() throws Exception {
+    triggerTester =
+        TriggerTester.forTrigger(
+            Never.ever(), FixedWindows.of(Duration.standardMinutes(5)));
+  }
+
+  @Test
+  public void falseAfterEndOfWindow() throws Exception {
+    triggerTester.injectElements(TimestampedValue.of(1, new Instant(1)));
+    IntervalWindow window =
+        new IntervalWindow(new Instant(0), new 
Instant(0).plus(Duration.standardMinutes(5)));
+    assertThat(triggerTester.shouldFire(window), is(false));
+    triggerTester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    assertThat(triggerTester.shouldFire(window), is(false));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java
new file mode 100644
index 0000000..7289d97
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link OrFinallyTrigger}.
+ */
+@RunWith(JUnit4.class)
+public class OrFinallyTriggerTest {
+
+  private SimpleTriggerTester<IntervalWindow> tester;
+
+  /**
+   * Tests that for {@code OrFinally(actual, ...)} when {@code actual}
+   * fires and finishes, the {@code OrFinally} also fires and finishes.
+   */
+  @Test
+  public void testActualFiresAndFinishes() throws Exception {
+    tester = TriggerTester.forTrigger(
+        new OrFinallyTrigger(
+            AfterPane.elementCountAtLeast(2),
+            AfterPane.elementCountAtLeast(100)),
+        FixedWindows.of(Duration.millis(100)));
+
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
+
+    // Not yet firing
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+    assertFalse(tester.isMarkedFinished(window));
+
+    // The actual fires and finishes
+    tester.injectElements(2);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertTrue(tester.isMarkedFinished(window));
+  }
+
+  /**
+   * Tests that for {@code OrFinally(actual, ...)} when {@code actual}
+   * fires but does not finish, the {@code OrFinally} also fires and also does 
not
+   * finish.
+   */
+  @Test
+  public void testActualFiresOnly() throws Exception {
+    tester = TriggerTester.forTrigger(
+        new OrFinallyTrigger(
+            Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
+            AfterPane.elementCountAtLeast(100)),
+        FixedWindows.of(Duration.millis(100)));
+
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
+
+    // Not yet firing
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+    assertFalse(tester.isMarkedFinished(window));
+
+    // The actual fires but does not finish
+    tester.injectElements(2);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+
+    // And again
+    tester.injectElements(3, 4);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+  }
+
+  /**
+   * Tests that if the first trigger rewinds to be non-finished in the merged 
window,
+   * then it becomes the currently active trigger again, with real triggers.
+   */
+  @Test
+  public void testShouldFireAfterMerge() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterEach.inOrder(
+            AfterPane.elementCountAtLeast(5)
+                .orFinally(AfterWatermark.pastEndOfWindow()),
+            Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    // Finished the orFinally in the first window
+    tester.injectElements(1);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    assertFalse(tester.shouldFire(firstWindow));
+    tester.advanceInputWatermark(new Instant(11));
+    assertTrue(tester.shouldFire(firstWindow));
+    tester.fireIfShouldFire(firstWindow);
+
+    // Set up second window where it is not done
+    tester.injectElements(5);
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Merge them, if the merged window were on the second trigger, it would 
be ready
+    tester.mergeWindows();
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
+    assertFalse(tester.shouldFire(mergedWindow));
+
+    // Now adding 3 more makes the main trigger ready to fire
+    tester.injectElements(1, 2, 3, 4, 5);
+    tester.mergeWindows();
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  /**
+   * Tests that for {@code OrFinally(actual, until)} when {@code actual}
+   * fires but does not finish, then {@code until} fires and finishes, the
+   * whole thing fires and finished.
+   */
+  @Test
+  public void testActualFiresButUntilFinishes() throws Exception {
+    tester = TriggerTester.forTrigger(
+        new OrFinallyTrigger(
+            Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
+                AfterPane.elementCountAtLeast(3)),
+        FixedWindows.of(Duration.millis(10)));
+
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(10));
+
+    // Before any firing
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+    assertFalse(tester.isMarkedFinished(window));
+
+    // The actual fires but doesn't finish
+    tester.injectElements(2);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+
+    // The until fires and finishes; the trigger is finished
+    tester.injectElements(3);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertTrue(tester.isMarkedFinished(window));
+  }
+
+  @Test
+  public void testFireDeadline() throws Exception {
+    BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+
+    assertEquals(new Instant(9),
+        Repeatedly.forever(AfterWatermark.pastEndOfWindow())
+        .getWatermarkThatGuaranteesFiring(window));
+    assertEquals(new Instant(9), 
Repeatedly.forever(AfterWatermark.pastEndOfWindow())
+        .orFinally(AfterPane.elementCountAtLeast(1))
+        .getWatermarkThatGuaranteesFiring(window));
+    assertEquals(new Instant(9), 
Repeatedly.forever(AfterPane.elementCountAtLeast(1))
+        .orFinally(AfterWatermark.pastEndOfWindow())
+        .getWatermarkThatGuaranteesFiring(window));
+    assertEquals(new Instant(9),
+        AfterPane.elementCountAtLeast(100)
+            .orFinally(AfterWatermark.pastEndOfWindow())
+            .getWatermarkThatGuaranteesFiring(window));
+
+    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
+        Repeatedly.forever(AfterPane.elementCountAtLeast(1))
+        .orFinally(AfterPane.elementCountAtLeast(10))
+        .getWatermarkThatGuaranteesFiring(window));
+  }
+
+  @Test
+  public void testContinuation() throws Exception {
+    OnceTrigger triggerA = AfterProcessingTime.pastFirstElementInPane();
+    OnceTrigger triggerB = AfterWatermark.pastEndOfWindow();
+    Trigger aOrFinallyB = triggerA.orFinally(triggerB);
+    Trigger bOrFinallyA = triggerB.orFinally(triggerA);
+    assertEquals(
+        Repeatedly.forever(
+            
triggerA.getContinuationTrigger().orFinally(triggerB.getContinuationTrigger())),
+        aOrFinallyB.getContinuationTrigger());
+    assertEquals(
+        Repeatedly.forever(
+            
triggerB.getContinuationTrigger().orFinally(triggerA.getContinuationTrigger())),
+        bOrFinallyA.getContinuationTrigger());
+  }
+
+  @Test
+  public void testToString() {
+    Trigger trigger = 
StubTrigger.named("t1").orFinally(StubTrigger.named("t2"));
+    assertEquals("t1.orFinally(t2)", trigger.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java
new file mode 100644
index 0000000..6e8930d
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link Repeatedly}.
+ */
+@RunWith(JUnit4.class)
+public class RepeatedlyTest {
+
+  @Mock private Trigger mockTrigger;
+  private SimpleTriggerTester<IntervalWindow> tester;
+  private static Trigger.TriggerContext anyTriggerContext() {
+    return Mockito.<Trigger.TriggerContext>any();
+  }
+
+  public void setUp(WindowFn<Object, IntervalWindow> windowFn) throws 
Exception {
+    MockitoAnnotations.initMocks(this);
+    tester = TriggerTester.forTrigger(Repeatedly.forever(mockTrigger), 
windowFn);
+  }
+
+  /**
+   * Tests that onElement correctly passes the data on to the subtrigger.
+   */
+  @Test
+  public void testOnElement() throws Exception {
+    setUp(FixedWindows.of(Duration.millis(10)));
+    tester.injectElements(37);
+    verify(mockTrigger).onElement(Mockito.<Trigger.OnElementContext>any());
+  }
+
+  /**
+   * Tests that the repeatedly is ready to fire whenever the subtrigger is 
ready.
+   */
+  @Test
+  public void testShouldFire() throws Exception {
+    setUp(FixedWindows.of(Duration.millis(10)));
+
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    assertTrue(tester.shouldFire(new IntervalWindow(new Instant(0), new 
Instant(10))));
+
+    when(mockTrigger.shouldFire(Mockito.<Trigger.TriggerContext>any()))
+        .thenReturn(false);
+    assertFalse(tester.shouldFire(new IntervalWindow(new Instant(0), new 
Instant(10))));
+  }
+
+  /**
+   * Tests that the watermark that guarantees firing is that of the subtrigger.
+   */
+  @Test
+  public void testFireDeadline() throws Exception {
+    setUp(FixedWindows.of(Duration.millis(10)));
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(10));
+    Instant arbitraryInstant = new Instant(34957849);
+
+    
when(mockTrigger.getWatermarkThatGuaranteesFiring(Mockito.<IntervalWindow>any()))
+        .thenReturn(arbitraryInstant);
+
+    assertThat(
+        
Repeatedly.forever(mockTrigger).getWatermarkThatGuaranteesFiring(window),
+        equalTo(arbitraryInstant));
+  }
+
+  @Test
+  public void testContinuation() throws Exception {
+    Trigger trigger = AfterProcessingTime.pastFirstElementInPane();
+    Trigger repeatedly = Repeatedly.forever(trigger);
+    assertEquals(
+        Repeatedly.forever(trigger.getContinuationTrigger()), 
repeatedly.getContinuationTrigger());
+    assertEquals(
+        
Repeatedly.forever(trigger.getContinuationTrigger().getContinuationTrigger()),
+        repeatedly.getContinuationTrigger().getContinuationTrigger());
+  }
+
+  @Test
+  public void testShouldFireAfterMerge() throws Exception {
+    tester = TriggerTester.forTrigger(
+        Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    tester.injectElements(1);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    assertFalse(tester.shouldFire(firstWindow));
+
+    tester.injectElements(5);
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Merge them, if the merged window were on the second trigger, it would 
be ready
+    tester.mergeWindows();
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  @Test
+  public void testRepeatedlyAfterFirstElementCount() throws Exception {
+    SimpleTriggerTester<GlobalWindow> tester =
+        TriggerTester.forTrigger(
+            Repeatedly.forever(
+                AfterFirst.of(
+                    AfterProcessingTime.pastFirstElementInPane()
+                        .plusDelayOf(Duration.standardMinutes(15)),
+                    AfterPane.elementCountAtLeast(5))),
+            new GlobalWindows());
+
+    GlobalWindow window = GlobalWindow.INSTANCE;
+
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+
+    tester.injectElements(2, 3, 4, 5);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.shouldFire(window));
+  }
+
+  @Test
+  public void testRepeatedlyAfterFirstProcessingTime() throws Exception {
+    SimpleTriggerTester<GlobalWindow> tester =
+        TriggerTester.forTrigger(
+            Repeatedly.forever(
+                AfterFirst.of(
+                    AfterProcessingTime.pastFirstElementInPane()
+                        .plusDelayOf(Duration.standardMinutes(15)),
+                    AfterPane.elementCountAtLeast(5))),
+            new GlobalWindows());
+
+    GlobalWindow window = GlobalWindow.INSTANCE;
+
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+
+    tester.advanceProcessingTime(new 
Instant(0).plus(Duration.standardMinutes(15)));
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.shouldFire(window));
+  }
+
+  @Test
+  public void testRepeatedlyElementCount() throws Exception {
+    SimpleTriggerTester<GlobalWindow> tester =
+        TriggerTester.forTrigger(
+            Repeatedly.forever(AfterPane.elementCountAtLeast(5)),
+            new GlobalWindows());
+
+    GlobalWindow window = GlobalWindow.INSTANCE;
+
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+
+    tester.injectElements(2, 3, 4, 5);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.shouldFire(window));
+  }
+
+  @Test
+  public void testRepeatedlyProcessingTime() throws Exception {
+    SimpleTriggerTester<GlobalWindow> tester =
+        TriggerTester.forTrigger(
+            Repeatedly.forever(
+                    AfterProcessingTime.pastFirstElementInPane()
+                        .plusDelayOf(Duration.standardMinutes(15))),
+            new GlobalWindows());
+
+    GlobalWindow window = GlobalWindow.INSTANCE;
+
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+
+    tester.advanceProcessingTime(new 
Instant(0).plus(Duration.standardMinutes(15)));
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.shouldFire(window));
+  }
+
+
+  @Test
+  public void testToString() {
+    Trigger trigger = Repeatedly.forever(new StubTrigger() {
+        @Override
+        public String toString() {
+          return "innerTrigger";
+        }
+      });
+
+    assertEquals("Repeatedly.forever(innerTrigger)", trigger.toString());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java
new file mode 100644
index 0000000..83077f4
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ReshuffleTrigger}.
+ */
+@RunWith(JUnit4.class)
+public class ReshuffleTriggerTest {
+
+  /** Public so that other tests can instantiate {@link ReshuffleTrigger}. */
+  public static <W extends BoundedWindow> ReshuffleTrigger<W> forTest() {
+    return new ReshuffleTrigger<>();
+  }
+
+  @Test
+  public void testShouldFire() throws Exception {
+    TriggerTester<Integer, IntervalWindow> tester = TriggerTester.forTrigger(
+        new ReshuffleTrigger<IntervalWindow>(), 
FixedWindows.of(Duration.millis(100)));
+    IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(300), new 
Instant(400));
+    assertTrue(tester.shouldFire(arbitraryWindow));
+  }
+
+  @Test
+  public void testOnTimer() throws Exception {
+    TriggerTester<Integer, IntervalWindow> tester = TriggerTester.forTrigger(
+        new ReshuffleTrigger<IntervalWindow>(), 
FixedWindows.of(Duration.millis(100)));
+    IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(100), new 
Instant(200));
+    tester.fireIfShouldFire(arbitraryWindow);
+    assertFalse(tester.isMarkedFinished(arbitraryWindow));
+  }
+
+  @Test
+  public void testToString() {
+    Trigger trigger = new ReshuffleTrigger<>();
+    assertEquals("ReshuffleTrigger()", trigger.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java
new file mode 100644
index 0000000..b258a79
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.joda.time.Instant;
+
+/**
+ * No-op {@link OnceTrigger} implementation for testing.
+ */
+abstract class StubTrigger extends Trigger.OnceTrigger {
+  /**
+   * Create a stub {@link Trigger} instance which returns the specified name 
on {@link #toString()}.
+   */
+  static StubTrigger named(final String name) {
+    return new StubTrigger() {
+      @Override
+      public String toString() {
+        return name;
+      }
+    };
+  }
+
+  protected StubTrigger() {
+    super(Lists.<Trigger>newArrayList());
+  }
+
+  @Override
+  protected void onOnlyFiring(TriggerContext context) throws Exception {
+  }
+
+  @Override
+  public void onElement(OnElementContext c) throws Exception {
+  }
+
+  @Override
+  public void onMerge(OnMergeContext c) throws Exception {
+  }
+
+  @Override
+  public boolean shouldFire(TriggerContext context) throws Exception {
+    return false;
+  }
+
+  @Override
+  protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) 
{
+    return null;
+  }
+
+  @Override
+  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java
new file mode 100644
index 0000000..cfc03b2
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.List;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link Trigger}.
+ */
+@RunWith(JUnit4.class)
+public class TriggerTest {
+
+  @Test
+  public void testTriggerToString() throws Exception {
+    assertEquals("AfterWatermark.pastEndOfWindow()", 
AfterWatermark.pastEndOfWindow().toString());
+    assertEquals("Repeatedly.forever(AfterWatermark.pastEndOfWindow())",
+        Repeatedly.forever(AfterWatermark.pastEndOfWindow()).toString());
+  }
+
+  @Test
+  public void testIsCompatible() throws Exception {
+    assertTrue(new Trigger1(null).isCompatible(new Trigger1(null)));
+    assertTrue(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null)))
+        .isCompatible(new Trigger1(Arrays.<Trigger>asList(new 
Trigger2(null)))));
+
+    assertFalse(new Trigger1(null).isCompatible(new Trigger2(null)));
+    assertFalse(new Trigger1(Arrays.<Trigger>asList(new Trigger1(null)))
+        .isCompatible(new Trigger1(Arrays.<Trigger>asList(new 
Trigger2(null)))));
+  }
+
+  private static class Trigger1 extends Trigger {
+
+    private Trigger1(List<Trigger> subTriggers) {
+      super(subTriggers);
+    }
+
+    @Override
+    public void onElement(Trigger.OnElementContext c) { }
+
+    @Override
+    public void onMerge(Trigger.OnMergeContext c) { }
+
+    @Override
+    protected Trigger getContinuationTrigger(
+        List<Trigger> continuationTriggers) {
+      return null;
+    }
+
+    @Override
+    public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+      return null;
+    }
+
+    @Override
+    public boolean shouldFire(Trigger.TriggerContext context) throws Exception 
{
+      return false;
+    }
+
+    @Override
+    public void onFire(Trigger.TriggerContext context) throws Exception { }
+  }
+
+  private static class Trigger2 extends Trigger {
+
+    private Trigger2(List<Trigger> subTriggers) {
+      super(subTriggers);
+    }
+
+    @Override
+    public void onElement(Trigger.OnElementContext c) { }
+
+    @Override
+    public void onMerge(Trigger.OnMergeContext c) { }
+
+    @Override
+    protected Trigger getContinuationTrigger(
+        List<Trigger> continuationTriggers) {
+      return null;
+    }
+
+    @Override
+    public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+      return null;
+    }
+
+    @Override
+    public boolean shouldFire(Trigger.TriggerContext context) throws Exception 
{
+      return false;
+    }
+
+    @Override
+    public void onFire(Trigger.TriggerContext context) throws Exception { }
+  }
+}


Reply via email to