Repository: incubator-beam
Updated Branches:
  refs/heads/master 73226168a -> e969f3d38


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
new file mode 100644
index 0000000..119c937
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests the {@link AfterWatermarkStateMachine} triggers.
+ */
+@RunWith(JUnit4.class)
+public class AfterWatermarkStateMachineTest {
+
+  @Mock private OnceTriggerStateMachine mockEarly;
+  @Mock private OnceTriggerStateMachine mockLate;
+
+  private SimpleTriggerStateMachineTester<IntervalWindow> tester;
+  private static TriggerStateMachine.TriggerContext anyTriggerContext() {
+    return Mockito.<TriggerStateMachine.TriggerContext>any();
+  }
+  private static TriggerStateMachine.OnElementContext anyElementContext() {
+    return Mockito.<TriggerStateMachine.OnElementContext>any();
+  }
+
+  private void injectElements(int... elements) throws Exception {
+    for (int element : elements) {
+      doNothing().when(mockEarly).onElement(anyElementContext());
+      doNothing().when(mockLate).onElement(anyElementContext());
+      tester.injectElements(element);
+    }
+  }
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  public void testRunningAsTrigger(OnceTriggerStateMachine mockTrigger, 
IntervalWindow window)
+      throws Exception {
+
+    // Don't fire due to mock saying no
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+    assertFalse(tester.shouldFire(window)); // not ready
+
+    // Fire due to mock trigger; early trigger is required to be a OnceTrigger
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    assertTrue(tester.shouldFire(window)); // ready
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+  }
+
+  @Test
+  public void testEarlyAndAtWatermark() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        AfterWatermarkStateMachine.pastEndOfWindow()
+            .withEarlyFirings(mockEarly),
+        FixedWindows.of(Duration.millis(100)));
+
+    injectElements(1);
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
+
+    testRunningAsTrigger(mockEarly, window);
+
+    // Fire due to watermark
+    when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
+    tester.advanceInputWatermark(new Instant(100));
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertTrue(tester.isMarkedFinished(window));
+  }
+
+  @Test
+  public void testAtWatermarkAndLate() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        AfterWatermarkStateMachine.pastEndOfWindow()
+            .withLateFirings(mockLate),
+        FixedWindows.of(Duration.millis(100)));
+
+    injectElements(1);
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
+
+    // No early firing, just double checking
+    when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(true);
+    assertFalse(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+
+    // Fire due to watermark
+    when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
+    tester.advanceInputWatermark(new Instant(100));
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+
+    testRunningAsTrigger(mockLate, window);
+  }
+
+  @Test
+  public void testEarlyAndAtWatermarkAndLate() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        AfterWatermarkStateMachine.pastEndOfWindow()
+            .withEarlyFirings(mockEarly)
+            .withLateFirings(mockLate),
+        FixedWindows.of(Duration.millis(100)));
+
+    injectElements(1);
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
+
+    testRunningAsTrigger(mockEarly, window);
+
+    // Fire due to watermark
+    when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
+    tester.advanceInputWatermark(new Instant(100));
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+
+    testRunningAsTrigger(mockLate, window);
+  }
+
+  /**
+   * Tests that if the EOW is finished in both as well as the merged window, 
then
+   * it is finished in the merged result.
+   *
+   * <p>Because windows are discarded when a trigger finishes, we need to 
embed this
+   * in a sequence in order to check that it is re-activated. So this test is 
potentially
+   * sensitive to other triggers' correctness.
+   */
+  @Test
+  public void testOnMergeAlreadyFinished() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        AfterEachStateMachine.inOrder(
+            AfterWatermarkStateMachine.pastEndOfWindow(),
+            
RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(1))),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    tester.injectElements(1);
+    tester.injectElements(5);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
+
+    // Finish the AfterWatermark.pastEndOfWindow() trigger in both windows
+    tester.advanceInputWatermark(new Instant(15));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertTrue(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+    tester.fireIfShouldFire(secondWindow);
+
+    // Confirm that we are on the second trigger by probing
+    assertFalse(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    tester.injectElements(1);
+    tester.injectElements(5);
+    assertTrue(tester.shouldFire(firstWindow));
+    assertTrue(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+    tester.fireIfShouldFire(secondWindow);
+
+    // Merging should leave it finished
+    tester.mergeWindows();
+
+    // Confirm that we are on the second trigger by probing
+    assertFalse(tester.shouldFire(mergedWindow));
+    tester.injectElements(1);
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  /**
+   * Tests that the trigger rewinds to be non-finished in the merged window.
+   *
+   * <p>Because windows are discarded when a trigger finishes, we need to 
embed this
+   * in a sequence in order to check that it is re-activated. So this test is 
potentially
+   * sensitive to other triggers' correctness.
+   */
+  @Test
+  public void testOnMergeRewinds() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        AfterEachStateMachine.inOrder(
+            AfterWatermarkStateMachine.pastEndOfWindow(),
+            
RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(1))),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    tester.injectElements(1);
+    tester.injectElements(5);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
+
+    // Finish the AfterWatermark.pastEndOfWindow() trigger in only the first 
window
+    tester.advanceInputWatermark(new Instant(11));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+
+    // Confirm that we are on the second trigger by probing
+    assertFalse(tester.shouldFire(firstWindow));
+    tester.injectElements(1);
+    assertTrue(tester.shouldFire(firstWindow));
+    tester.fireIfShouldFire(firstWindow);
+
+    // Merging should re-activate the watermark trigger in the merged window
+    tester.mergeWindows();
+
+    // Confirm that we are not on the second trigger by probing
+    assertFalse(tester.shouldFire(mergedWindow));
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(mergedWindow));
+
+    // And confirm that advancing the watermark fires again
+    tester.advanceInputWatermark(new Instant(15));
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  /**
+   * Tests that if the EOW is finished in both as well as the merged window, 
then
+   * it is finished in the merged result.
+   *
+   * <p>Because windows are discarded when a trigger finishes, we need to 
embed this
+   * in a sequence in order to check that it is re-activated. So this test is 
potentially
+   * sensitive to other triggers' correctness.
+   */
+  @Test
+  public void testEarlyAndLateOnMergeAlreadyFinished() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        AfterWatermarkStateMachine.pastEndOfWindow()
+            .withEarlyFirings(AfterPaneStateMachine.elementCountAtLeast(100))
+            .withLateFirings(AfterPaneStateMachine.elementCountAtLeast(1)),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    tester.injectElements(1);
+    tester.injectElements(5);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
+
+    // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in both 
windows
+    tester.advanceInputWatermark(new Instant(15));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertTrue(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+    tester.fireIfShouldFire(secondWindow);
+
+    // Confirm that we are on the late trigger by probing
+    assertFalse(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    tester.injectElements(1);
+    tester.injectElements(5);
+    assertTrue(tester.shouldFire(firstWindow));
+    assertTrue(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+    tester.fireIfShouldFire(secondWindow);
+
+    // Merging should leave it on the late trigger
+    tester.mergeWindows();
+
+    // Confirm that we are on the late trigger by probing
+    assertFalse(tester.shouldFire(mergedWindow));
+    tester.injectElements(1);
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  /**
+   * Tests that the trigger rewinds to be non-finished in the merged window.
+   *
+   * <p>Because windows are discarded when a trigger finishes, we need to 
embed this
+   * in a sequence in order to check that it is re-activated. So this test is 
potentially
+   * sensitive to other triggers' correctness.
+   */
+  @Test
+  public void testEarlyAndLateOnMergeRewinds() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        AfterWatermarkStateMachine.pastEndOfWindow()
+            .withEarlyFirings(AfterPaneStateMachine.elementCountAtLeast(100))
+            .withLateFirings(AfterPaneStateMachine.elementCountAtLeast(1)),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    tester.injectElements(1);
+    tester.injectElements(5);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
+
+    // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in only 
the first window
+    tester.advanceInputWatermark(new Instant(11));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+
+    // Confirm that we are on the late trigger by probing
+    assertFalse(tester.shouldFire(firstWindow));
+    tester.injectElements(1);
+    assertTrue(tester.shouldFire(firstWindow));
+    tester.fireIfShouldFire(firstWindow);
+
+    // Merging should re-activate the early trigger in the merged window
+    tester.mergeWindows();
+
+    // Confirm that we are not on the second trigger by probing
+    assertFalse(tester.shouldFire(mergedWindow));
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(mergedWindow));
+
+    // And confirm that advancing the watermark fires again
+    tester.advanceInputWatermark(new Instant(15));
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  @Test
+  public void testFromEndOfWindowToString() {
+    TriggerStateMachine trigger = AfterWatermarkStateMachine.pastEndOfWindow();
+    assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString());
+  }
+
+  @Test
+  public void testEarlyFiringsToString() {
+    TriggerStateMachine trigger = AfterWatermarkStateMachine.pastEndOfWindow()
+        .withEarlyFirings(StubTriggerStateMachine.named("t1"));
+
+    assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1)", 
trigger.toString());
+  }
+
+  @Test
+  public void testLateFiringsToString() {
+    TriggerStateMachine trigger = AfterWatermarkStateMachine.pastEndOfWindow()
+        .withLateFirings(StubTriggerStateMachine.named("t1"));
+
+    assertEquals("AfterWatermark.pastEndOfWindow().withLateFirings(t1)", 
trigger.toString());
+  }
+
+  @Test
+  public void testEarlyAndLateFiringsToString() {
+    TriggerStateMachine trigger = AfterWatermarkStateMachine.pastEndOfWindow()
+        .withEarlyFirings(StubTriggerStateMachine.named("t1"))
+        .withLateFirings(StubTriggerStateMachine.named("t2"));
+
+    
assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1).withLateFirings(t2)",
+        trigger.toString());
+  }
+
+  @Test
+  public void testToStringExcludesNeverTrigger() {
+    TriggerStateMachine trigger = AfterWatermarkStateMachine.pastEndOfWindow()
+        .withEarlyFirings(NeverStateMachine.ever())
+        .withLateFirings(NeverStateMachine.ever());
+
+    assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/DefaultTriggerStateMachineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/DefaultTriggerStateMachineTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/DefaultTriggerStateMachineTest.java
new file mode 100644
index 0000000..b11d319
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/DefaultTriggerStateMachineTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests the {@link DefaultTriggerStateMachine}, which should be equivalent to
+ * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
+ */
+@RunWith(JUnit4.class)
+public class DefaultTriggerStateMachineTest {
+
+  SimpleTriggerStateMachineTester<IntervalWindow> tester;
+
+  @Test
+  public void testDefaultTriggerFixedWindows() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        DefaultTriggerStateMachine.of(),
+        FixedWindows.of(Duration.millis(100)));
+
+    tester.injectElements(
+        1, // [0, 100)
+        101); // [100, 200)
+
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new 
Instant(100));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(100), new 
Instant(200));
+
+    // Advance the watermark almost to the end of the first window.
+    tester.advanceInputWatermark(new Instant(99));
+    assertFalse(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Advance watermark past end of the first window, which is then ready
+    tester.advanceInputWatermark(new Instant(100));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Fire, but the first window is still allowed to fire
+    tester.fireIfShouldFire(firstWindow);
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Advance watermark to 200, then both are ready
+    tester.advanceInputWatermark(new Instant(200));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertTrue(tester.shouldFire(secondWindow));
+
+    assertFalse(tester.isMarkedFinished(firstWindow));
+    assertFalse(tester.isMarkedFinished(secondWindow));
+  }
+
+  @Test
+  public void testDefaultTriggerSlidingWindows() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        DefaultTriggerStateMachine.of(),
+        SlidingWindows.of(Duration.millis(100)).every(Duration.millis(50)));
+
+    tester.injectElements(
+        1, // [-50, 50), [0, 100)
+        50); // [0, 100), [50, 150)
+
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(-50), new 
Instant(50));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new 
Instant(100));
+    IntervalWindow thirdWindow = new IntervalWindow(new Instant(50), new 
Instant(150));
+
+    assertFalse(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    assertFalse(tester.shouldFire(thirdWindow));
+
+    // At 50, the first becomes ready; it stays ready after firing
+    tester.advanceInputWatermark(new Instant(50));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    assertFalse(tester.shouldFire(thirdWindow));
+    tester.fireIfShouldFire(firstWindow);
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    assertFalse(tester.shouldFire(thirdWindow));
+
+    // At 99, the first is still the only one ready
+    tester.advanceInputWatermark(new Instant(99));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    assertFalse(tester.shouldFire(thirdWindow));
+
+    // At 100, the first and second are ready
+    tester.advanceInputWatermark(new Instant(100));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertTrue(tester.shouldFire(secondWindow));
+    assertFalse(tester.shouldFire(thirdWindow));
+    tester.fireIfShouldFire(firstWindow);
+
+    assertFalse(tester.isMarkedFinished(firstWindow));
+    assertFalse(tester.isMarkedFinished(secondWindow));
+    assertFalse(tester.isMarkedFinished(thirdWindow));
+  }
+
+  @Test
+  public void testDefaultTriggerSessions() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        DefaultTriggerStateMachine.of(),
+        Sessions.withGapDuration(Duration.millis(100)));
+
+    tester.injectElements(
+        1, // [1, 101)
+        50); // [50, 150)
+    tester.mergeWindows();
+
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(101));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(50), new 
Instant(150));
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(150));
+
+    // Not ready in any window yet
+    tester.advanceInputWatermark(new Instant(100));
+    assertFalse(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    assertFalse(tester.shouldFire(mergedWindow));
+
+    // The first window is "ready": the caller owns knowledge of which windows 
are merged away
+    tester.advanceInputWatermark(new Instant(149));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    assertFalse(tester.shouldFire(mergedWindow));
+
+    // Now ready on all windows
+    tester.advanceInputWatermark(new Instant(150));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertTrue(tester.shouldFire(secondWindow));
+    assertTrue(tester.shouldFire(mergedWindow));
+
+    // Ensure it repeats
+    tester.fireIfShouldFire(mergedWindow);
+    assertTrue(tester.shouldFire(mergedWindow));
+
+    assertFalse(tester.isMarkedFinished(mergedWindow));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachineTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachineTest.java
new file mode 100644
index 0000000..744c220
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachineTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+import java.util.Arrays;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ExecutableTriggerStateMachine}.
+ */
+@RunWith(JUnit4.class)
+public class ExecutableTriggerStateMachineTest {
+
+  @Test
+  public void testIndexAssignmentLeaf() throws Exception {
+    StubStateMachine t1 = new StubStateMachine();
+    ExecutableTriggerStateMachine executable = 
ExecutableTriggerStateMachine.create(t1);
+    assertEquals(0, executable.getTriggerIndex());
+  }
+
+  @Test
+  public void testIndexAssignmentOneLevel() throws Exception {
+    StubStateMachine t1 = new StubStateMachine();
+    StubStateMachine t2 = new StubStateMachine();
+    StubStateMachine t = new StubStateMachine(t1, t2);
+
+    ExecutableTriggerStateMachine executable = 
ExecutableTriggerStateMachine.create(t);
+
+    assertEquals(0, executable.getTriggerIndex());
+    assertEquals(1, executable.subTriggers().get(0).getTriggerIndex());
+    assertSame(t1, executable.subTriggers().get(0).getSpec());
+    assertEquals(2, executable.subTriggers().get(1).getTriggerIndex());
+    assertSame(t2, executable.subTriggers().get(1).getSpec());
+  }
+
+  @Test
+  public void testIndexAssignmentTwoLevel() throws Exception {
+    StubStateMachine t11 = new StubStateMachine();
+    StubStateMachine t12 = new StubStateMachine();
+    StubStateMachine t13 = new StubStateMachine();
+    StubStateMachine t14 = new StubStateMachine();
+    StubStateMachine t21 = new StubStateMachine();
+    StubStateMachine t22 = new StubStateMachine();
+    StubStateMachine t1 = new StubStateMachine(t11, t12, t13, t14);
+    StubStateMachine t2 = new StubStateMachine(t21, t22);
+    StubStateMachine t = new StubStateMachine(t1, t2);
+
+    ExecutableTriggerStateMachine executable = 
ExecutableTriggerStateMachine.create(t);
+
+    assertEquals(0, executable.getTriggerIndex());
+    assertEquals(1, executable.subTriggers().get(0).getTriggerIndex());
+    assertEquals(6, 
executable.subTriggers().get(0).getFirstIndexAfterSubtree());
+    assertEquals(6, executable.subTriggers().get(1).getTriggerIndex());
+
+    assertSame(t1, executable.getSubTriggerContaining(1).getSpec());
+    assertSame(t2, executable.getSubTriggerContaining(6).getSpec());
+    assertSame(t1, executable.getSubTriggerContaining(2).getSpec());
+    assertSame(t1, executable.getSubTriggerContaining(3).getSpec());
+    assertSame(t1, executable.getSubTriggerContaining(5).getSpec());
+    assertSame(t2, executable.getSubTriggerContaining(7).getSpec());
+  }
+
+  private static class StubStateMachine extends TriggerStateMachine {
+
+    @SafeVarargs
+    protected StubStateMachine(TriggerStateMachine... subTriggers) {
+      super(Arrays.asList(subTriggers));
+    }
+
+    @Override
+    public void onElement(OnElementContext c) throws Exception { }
+
+    @Override
+    public void onMerge(OnMergeContext c) throws Exception { }
+
+    @Override
+    public void clear(TriggerContext c) throws Exception {
+    }
+
+    @Override
+    public boolean shouldFire(TriggerContext c) {
+      return false;
+    }
+
+    @Override
+    public void onFire(TriggerContext c) { }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSetTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSetTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSetTest.java
new file mode 100644
index 0000000..16bf49d
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSetTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.theInstance;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link FinishedTriggersBitSet}.
+ */
+@RunWith(JUnit4.class)
+public class FinishedTriggersBitSetTest {
+  /**
+   * Tests that after a trigger is set to finished, it reads back as finished.
+   */
+  @Test
+  public void testSetGet() {
+    
FinishedTriggersProperties.verifyGetAfterSet(FinishedTriggersBitSet.emptyWithCapacity(1));
+  }
+
+  /**
+   * Tests that clearing a trigger recursively clears all of that triggers 
subTriggers, but no
+   * others.
+   */
+  @Test
+  public void testClearRecursively() {
+    
FinishedTriggersProperties.verifyClearRecursively(FinishedTriggersBitSet.emptyWithCapacity(1));
+  }
+
+  @Test
+  public void testCopy() throws Exception {
+    FinishedTriggersBitSet finishedSet = 
FinishedTriggersBitSet.emptyWithCapacity(10);
+    assertThat(finishedSet.copy().getBitSet(), 
not(theInstance(finishedSet.getBitSet())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersProperties.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersProperties.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersProperties.java
new file mode 100644
index 0000000..31d17c1
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersProperties.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Generalized tests for {@link FinishedTriggers} implementations.
+ */
+public class FinishedTriggersProperties {
+  /**
+   * Tests that for the provided trigger and {@link FinishedTriggers}, when 
the trigger is set
+   * finished, it is correctly reported as finished.
+   */
+  public static void verifyGetAfterSet(
+      FinishedTriggers finishedSet, ExecutableTriggerStateMachine trigger) {
+    assertFalse(finishedSet.isFinished(trigger));
+    finishedSet.setFinished(trigger, true);
+    assertTrue(finishedSet.isFinished(trigger));
+  }
+
+  /**
+   * For a few arbitrary triggers, tests that when the trigger is set finished 
it is correctly
+   * reported as finished.
+   */
+  public static void verifyGetAfterSet(FinishedTriggers finishedSet) {
+    ExecutableTriggerStateMachine trigger =
+        ExecutableTriggerStateMachine.create(
+            AfterAllStateMachine.of(
+                AfterFirstStateMachine.of(
+                    AfterPaneStateMachine.elementCountAtLeast(3),
+                    AfterWatermarkStateMachine.pastEndOfWindow()),
+                AfterAllStateMachine.of(
+                    AfterPaneStateMachine.elementCountAtLeast(10),
+                    
AfterProcessingTimeStateMachine.pastFirstElementInPane())));
+
+    verifyGetAfterSet(finishedSet, trigger);
+    verifyGetAfterSet(finishedSet, 
trigger.subTriggers().get(0).subTriggers().get(1));
+    verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0));
+    verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1));
+    verifyGetAfterSet(finishedSet, 
trigger.subTriggers().get(1).subTriggers().get(1));
+    verifyGetAfterSet(finishedSet, 
trigger.subTriggers().get(1).subTriggers().get(0));
+  }
+
+  /**
+   * Tests that clearing a trigger recursively clears all of that triggers 
subTriggers, but no
+   * others.
+   */
+  public static void verifyClearRecursively(FinishedTriggers finishedSet) {
+    ExecutableTriggerStateMachine trigger =
+        ExecutableTriggerStateMachine.create(
+            AfterAllStateMachine.of(
+                AfterFirstStateMachine.of(
+                    AfterPaneStateMachine.elementCountAtLeast(3),
+                    AfterWatermarkStateMachine.pastEndOfWindow()),
+                AfterAllStateMachine.of(
+                    AfterPaneStateMachine.elementCountAtLeast(10),
+                    
AfterProcessingTimeStateMachine.pastFirstElementInPane())));
+
+    // Set them all finished. This method is not on a trigger as it makes no 
sense outside tests.
+    setFinishedRecursively(finishedSet, trigger);
+    assertTrue(finishedSet.isFinished(trigger));
+    assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0)));
+    
assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(0)));
+    
assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(1)));
+
+    // Clear just the second AfterAll
+    finishedSet.clearRecursively(trigger.subTriggers().get(1));
+
+    // Check that the first and all that are still finished
+    assertTrue(finishedSet.isFinished(trigger));
+    verifyFinishedRecursively(finishedSet, trigger.subTriggers().get(0));
+    verifyUnfinishedRecursively(finishedSet, trigger.subTriggers().get(1));
+  }
+
+  private static void setFinishedRecursively(
+      FinishedTriggers finishedSet, ExecutableTriggerStateMachine trigger) {
+    finishedSet.setFinished(trigger, true);
+    for (ExecutableTriggerStateMachine subTrigger : trigger.subTriggers()) {
+      setFinishedRecursively(finishedSet, subTrigger);
+    }
+  }
+
+  private static void verifyFinishedRecursively(
+      FinishedTriggers finishedSet, ExecutableTriggerStateMachine trigger) {
+    assertTrue(finishedSet.isFinished(trigger));
+    for (ExecutableTriggerStateMachine subTrigger : trigger.subTriggers()) {
+      verifyFinishedRecursively(finishedSet, subTrigger);
+    }
+  }
+
+  private static void verifyUnfinishedRecursively(
+      FinishedTriggers finishedSet, ExecutableTriggerStateMachine trigger) {
+    assertFalse(finishedSet.isFinished(trigger));
+    for (ExecutableTriggerStateMachine subTrigger : trigger.subTriggers()) {
+      verifyUnfinishedRecursively(finishedSet, subTrigger);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersSetTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersSetTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersSetTest.java
new file mode 100644
index 0000000..fae73f3
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersSetTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.theInstance;
+import static org.junit.Assert.assertThat;
+
+import java.util.HashSet;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link FinishedTriggersSet}.
+ */
+@RunWith(JUnit4.class)
+public class FinishedTriggersSetTest {
+  /**
+   * Tests that after a trigger is set to finished, it reads back as finished.
+   */
+  @Test
+  public void testSetGet() {
+    FinishedTriggersProperties.verifyGetAfterSet(
+        FinishedTriggersSet.fromSet(new 
HashSet<ExecutableTriggerStateMachine>()));
+  }
+
+  /**
+   * Tests that clearing a trigger recursively clears all of that triggers 
subTriggers, but no
+   * others.
+   */
+  @Test
+  public void testClearRecursively() {
+    FinishedTriggersProperties.verifyClearRecursively(
+        FinishedTriggersSet.fromSet(new 
HashSet<ExecutableTriggerStateMachine>()));
+  }
+
+  @Test
+  public void testCopy() throws Exception {
+    FinishedTriggersSet finishedSet =
+        FinishedTriggersSet.fromSet(new 
HashSet<ExecutableTriggerStateMachine>());
+    assertThat(finishedSet.copy().getFinishedTriggers(),
+        not(theInstance(finishedSet.getFinishedTriggers())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/NeverStateMachineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/NeverStateMachineTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/NeverStateMachineTest.java
new file mode 100644
index 0000000..6d8a344
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/NeverStateMachineTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link NeverStateMachine}.
+ */
+@RunWith(JUnit4.class)
+public class NeverStateMachineTest {
+  private SimpleTriggerStateMachineTester<IntervalWindow> triggerTester;
+
+  @Before
+  public void setup() throws Exception {
+    triggerTester =
+        TriggerStateMachineTester.forTrigger(
+            NeverStateMachine.ever(), 
FixedWindows.of(Duration.standardMinutes(5)));
+  }
+
+  @Test
+  public void falseAfterEndOfWindow() throws Exception {
+    triggerTester.injectElements(TimestampedValue.of(1, new Instant(1)));
+    IntervalWindow window =
+        new IntervalWindow(new Instant(0), new 
Instant(0).plus(Duration.standardMinutes(5)));
+    assertThat(triggerTester.shouldFire(window), is(false));
+    triggerTester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    assertThat(triggerTester.shouldFire(window), is(false));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachineTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachineTest.java
new file mode 100644
index 0000000..6e093c5
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachineTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link OrFinallyStateMachine}.
+ */
+@RunWith(JUnit4.class)
+public class OrFinallyStateMachineTest {
+
+  private SimpleTriggerStateMachineTester<IntervalWindow> tester;
+
+  /**
+   * Tests that for {@code OrFinally(actual, ...)} when {@code actual}
+   * fires and finishes, the {@code OrFinally} also fires and finishes.
+   */
+  @Test
+  public void testActualFiresAndFinishes() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        new OrFinallyStateMachine(
+            AfterPaneStateMachine.elementCountAtLeast(2),
+            AfterPaneStateMachine.elementCountAtLeast(100)),
+        FixedWindows.of(Duration.millis(100)));
+
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
+
+    // Not yet firing
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+    assertFalse(tester.isMarkedFinished(window));
+
+    // The actual fires and finishes
+    tester.injectElements(2);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertTrue(tester.isMarkedFinished(window));
+  }
+
+  /**
+   * Tests that for {@code OrFinally(actual, ...)} when {@code actual}
+   * fires but does not finish, the {@code OrFinally} also fires and also does 
not
+   * finish.
+   */
+  @Test
+  public void testActualFiresOnly() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        new OrFinallyStateMachine(
+            
RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(2)),
+            AfterPaneStateMachine.elementCountAtLeast(100)),
+        FixedWindows.of(Duration.millis(100)));
+
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
+
+    // Not yet firing
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+    assertFalse(tester.isMarkedFinished(window));
+
+    // The actual fires but does not finish
+    tester.injectElements(2);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+
+    // And again
+    tester.injectElements(3, 4);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+  }
+
+  /**
+   * Tests that if the first trigger rewinds to be non-finished in the merged 
window,
+   * then it becomes the currently active trigger again, with real triggers.
+   */
+  @Test
+  public void testShouldFireAfterMerge() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        AfterEachStateMachine.inOrder(
+            AfterPaneStateMachine.elementCountAtLeast(5)
+                .orFinally(AfterWatermarkStateMachine.pastEndOfWindow()),
+            
RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(1))),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    // Finished the orFinally in the first window
+    tester.injectElements(1);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    assertFalse(tester.shouldFire(firstWindow));
+    tester.advanceInputWatermark(new Instant(11));
+    assertTrue(tester.shouldFire(firstWindow));
+    tester.fireIfShouldFire(firstWindow);
+
+    // Set up second window where it is not done
+    tester.injectElements(5);
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Merge them, if the merged window were on the second trigger, it would 
be ready
+    tester.mergeWindows();
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
+    assertFalse(tester.shouldFire(mergedWindow));
+
+    // Now adding 3 more makes the main trigger ready to fire
+    tester.injectElements(1, 2, 3, 4, 5);
+    tester.mergeWindows();
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  /**
+   * Tests that for {@code OrFinally(actual, until)} when {@code actual}
+   * fires but does not finish, then {@code until} fires and finishes, the
+   * whole thing fires and finished.
+   */
+  @Test
+  public void testActualFiresButUntilFinishes() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        new OrFinallyStateMachine(
+            
RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(2)),
+                AfterPaneStateMachine.elementCountAtLeast(3)),
+        FixedWindows.of(Duration.millis(10)));
+
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(10));
+
+    // Before any firing
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+    assertFalse(tester.isMarkedFinished(window));
+
+    // The actual fires but doesn't finish
+    tester.injectElements(2);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+
+    // The until fires and finishes; the trigger is finished
+    tester.injectElements(3);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertTrue(tester.isMarkedFinished(window));
+  }
+
+  @Test
+  public void testToString() {
+    TriggerStateMachine trigger =
+        
StubTriggerStateMachine.named("t1").orFinally(StubTriggerStateMachine.named("t2"));
+    assertEquals("t1.orFinally(t2)", trigger.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/RepeatedlyStateMachineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/RepeatedlyStateMachineTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/RepeatedlyStateMachineTest.java
new file mode 100644
index 0000000..b52f41d
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/RepeatedlyStateMachineTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link RepeatedlyStateMachine}.
+ */
+@RunWith(JUnit4.class)
+public class RepeatedlyStateMachineTest {
+
+  @Mock private TriggerStateMachine mockTrigger;
+  private SimpleTriggerStateMachineTester<IntervalWindow> tester;
+  private static TriggerStateMachine.TriggerContext anyTriggerContext() {
+    return Mockito.<TriggerStateMachine.TriggerContext>any();
+  }
+
+  public void setUp(WindowFn<Object, IntervalWindow> windowFn) throws 
Exception {
+    MockitoAnnotations.initMocks(this);
+    tester = TriggerStateMachineTester
+        .forTrigger(RepeatedlyStateMachine.forever(mockTrigger), windowFn);
+  }
+
+  /**
+   * Tests that onElement correctly passes the data on to the subtrigger.
+   */
+  @Test
+  public void testOnElement() throws Exception {
+    setUp(FixedWindows.of(Duration.millis(10)));
+    tester.injectElements(37);
+    
verify(mockTrigger).onElement(Mockito.<TriggerStateMachine.OnElementContext>any());
+  }
+
+  /**
+   * Tests that the repeatedly is ready to fire whenever the subtrigger is 
ready.
+   */
+  @Test
+  public void testShouldFire() throws Exception {
+    setUp(FixedWindows.of(Duration.millis(10)));
+
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    assertTrue(tester.shouldFire(new IntervalWindow(new Instant(0), new 
Instant(10))));
+
+    
when(mockTrigger.shouldFire(Mockito.<TriggerStateMachine.TriggerContext>any()))
+        .thenReturn(false);
+    assertFalse(tester.shouldFire(new IntervalWindow(new Instant(0), new 
Instant(10))));
+  }
+
+  @Test
+  public void testShouldFireAfterMerge() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        
RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(2)),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    tester.injectElements(1);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    assertFalse(tester.shouldFire(firstWindow));
+
+    tester.injectElements(5);
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Merge them, if the merged window were on the second trigger, it would 
be ready
+    tester.mergeWindows();
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  @Test
+  public void testRepeatedlyAfterFirstElementCount() throws Exception {
+    SimpleTriggerStateMachineTester<GlobalWindow> tester =
+        TriggerStateMachineTester.forTrigger(
+            RepeatedlyStateMachine.forever(
+                AfterFirstStateMachine.of(
+                    AfterProcessingTimeStateMachine.pastFirstElementInPane()
+                        .plusDelayOf(Duration.standardMinutes(15)),
+                    AfterPaneStateMachine.elementCountAtLeast(5))),
+            new GlobalWindows());
+
+    GlobalWindow window = GlobalWindow.INSTANCE;
+
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+
+    tester.injectElements(2, 3, 4, 5);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.shouldFire(window));
+  }
+
+  @Test
+  public void testRepeatedlyAfterFirstProcessingTime() throws Exception {
+    SimpleTriggerStateMachineTester<GlobalWindow> tester =
+        TriggerStateMachineTester.forTrigger(
+            RepeatedlyStateMachine.forever(
+                AfterFirstStateMachine.of(
+                    AfterProcessingTimeStateMachine.pastFirstElementInPane()
+                        .plusDelayOf(Duration.standardMinutes(15)),
+                    AfterPaneStateMachine.elementCountAtLeast(5))),
+            new GlobalWindows());
+
+    GlobalWindow window = GlobalWindow.INSTANCE;
+
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+
+    tester.advanceProcessingTime(new 
Instant(0).plus(Duration.standardMinutes(15)));
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.shouldFire(window));
+  }
+
+  @Test
+  public void testRepeatedlyElementCount() throws Exception {
+    SimpleTriggerStateMachineTester<GlobalWindow> tester =
+        TriggerStateMachineTester.forTrigger(
+            
RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(5)),
+            new GlobalWindows());
+
+    GlobalWindow window = GlobalWindow.INSTANCE;
+
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+
+    tester.injectElements(2, 3, 4, 5);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.shouldFire(window));
+  }
+
+  @Test
+  public void testRepeatedlyProcessingTime() throws Exception {
+    SimpleTriggerStateMachineTester<GlobalWindow> tester =
+        TriggerStateMachineTester.forTrigger(
+            RepeatedlyStateMachine.forever(
+                    AfterProcessingTimeStateMachine.pastFirstElementInPane()
+                        .plusDelayOf(Duration.standardMinutes(15))),
+            new GlobalWindows());
+
+    GlobalWindow window = GlobalWindow.INSTANCE;
+
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+
+    tester.advanceProcessingTime(new 
Instant(0).plus(Duration.standardMinutes(15)));
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.shouldFire(window));
+  }
+
+
+  @Test
+  public void testToString() {
+    TriggerStateMachine trigger = RepeatedlyStateMachine.forever(new 
StubTriggerStateMachine() {
+        @Override
+        public String toString() {
+          return "innerTrigger";
+        }
+      });
+
+    assertEquals("Repeatedly.forever(innerTrigger)", trigger.toString());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/ReshuffleTriggerStateMachineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/ReshuffleTriggerStateMachineTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/ReshuffleTriggerStateMachineTest.java
new file mode 100644
index 0000000..ef74bb5
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/ReshuffleTriggerStateMachineTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ReshuffleTriggerStateMachine}.
+ */
+@RunWith(JUnit4.class)
+public class ReshuffleTriggerStateMachineTest {
+
+  /** Public so that other tests can instantiate {@link 
ReshuffleTriggerStateMachine}. */
+  public static <W extends BoundedWindow> ReshuffleTriggerStateMachine 
forTest() {
+    return new ReshuffleTriggerStateMachine();
+  }
+
+  @Test
+  public void testShouldFire() throws Exception {
+    TriggerStateMachineTester<Integer, IntervalWindow> tester =
+        TriggerStateMachineTester.forTrigger(
+            new ReshuffleTriggerStateMachine(), 
FixedWindows.of(Duration.millis(100)));
+    IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(300), new 
Instant(400));
+    assertTrue(tester.shouldFire(arbitraryWindow));
+  }
+
+  @Test
+  public void testOnTimer() throws Exception {
+    TriggerStateMachineTester<Integer, IntervalWindow> tester =
+        TriggerStateMachineTester.forTrigger(
+            new ReshuffleTriggerStateMachine(), 
FixedWindows.of(Duration.millis(100)));
+    IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(100), new 
Instant(200));
+    tester.fireIfShouldFire(arbitraryWindow);
+    assertFalse(tester.isMarkedFinished(arbitraryWindow));
+  }
+
+  @Test
+  public void testToString() {
+    TriggerStateMachine trigger = new ReshuffleTriggerStateMachine();
+    assertEquals("ReshuffleTriggerStateMachine()", trigger.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java
new file mode 100644
index 0000000..4512848
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import com.google.common.collect.Lists;
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
+
+/**
+ * No-op {@link OnceTriggerStateMachine} implementation for testing.
+ */
+abstract class StubTriggerStateMachine extends OnceTriggerStateMachine {
+  /**
+   * Create a stub {@link TriggerStateMachine} instance which returns the 
specified name on {@link
+   * #toString()}.
+   */
+  static StubTriggerStateMachine named(final String name) {
+    return new StubTriggerStateMachine() {
+      @Override
+      public String toString() {
+        return name;
+      }
+    };
+  }
+
+  protected StubTriggerStateMachine() {
+    super(Lists.<TriggerStateMachine>newArrayList());
+  }
+
+  @Override
+  protected void onOnlyFiring(TriggerContext context) throws Exception {
+  }
+
+  @Override
+  public void onElement(OnElementContext c) throws Exception {
+  }
+
+  @Override
+  public void onMerge(OnMergeContext c) throws Exception {
+  }
+
+  @Override
+  public boolean shouldFire(TriggerContext context) throws Exception {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTest.java
new file mode 100644
index 0000000..e06eb85
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link TriggerStateMachine}.
+ */
+@RunWith(JUnit4.class)
+public class TriggerStateMachineTest {
+
+  @Test
+  public void testTriggerToString() throws Exception {
+    assertEquals(
+        "AfterWatermark.pastEndOfWindow()",
+        AfterWatermarkStateMachine.pastEndOfWindow().toString());
+    assertEquals(
+        "Repeatedly.forever(AfterWatermark.pastEndOfWindow())",
+        
RepeatedlyStateMachine.forever(AfterWatermarkStateMachine.pastEndOfWindow()).toString());
+  }
+
+  @Test
+  public void testIsCompatible() throws Exception {
+    assertTrue(new Trigger1(null).isCompatible(new Trigger1(null)));
+    assertTrue(new Trigger1(Arrays.<TriggerStateMachine>asList(new 
Trigger2(null)))
+        .isCompatible(new Trigger1(Arrays.<TriggerStateMachine>asList(new 
Trigger2(null)))));
+
+    assertFalse(new Trigger1(null).isCompatible(new Trigger2(null)));
+    assertFalse(new Trigger1(Arrays.<TriggerStateMachine>asList(new 
Trigger1(null)))
+        .isCompatible(new Trigger1(Arrays.<TriggerStateMachine>asList(new 
Trigger2(null)))));
+  }
+
+  private static class Trigger1 extends TriggerStateMachine {
+
+    private Trigger1(List<TriggerStateMachine> subTriggers) {
+      super(subTriggers);
+    }
+
+    @Override
+    public void onElement(TriggerStateMachine.OnElementContext c) { }
+
+    @Override
+    public void onMerge(TriggerStateMachine.OnMergeContext c) { }
+
+    @Override
+    public boolean shouldFire(TriggerStateMachine.TriggerContext context) 
throws Exception {
+      return false;
+    }
+
+    @Override
+    public void onFire(TriggerStateMachine.TriggerContext context) throws 
Exception { }
+  }
+
+  private static class Trigger2 extends TriggerStateMachine {
+
+    private Trigger2(List<TriggerStateMachine> subTriggers) {
+      super(subTriggers);
+    }
+
+    @Override
+    public void onElement(TriggerStateMachine.OnElementContext c) { }
+
+    @Override
+    public void onMerge(TriggerStateMachine.OnMergeContext c) { }
+
+    @Override
+    public boolean shouldFire(TriggerStateMachine.TriggerContext context) 
throws Exception {
+      return false;
+    }
+
+    @Override
+    public void onFire(TriggerStateMachine.TriggerContext context) throws 
Exception { }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
new file mode 100644
index 0000000..1ccca17
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.ActiveWindowSet;
+import org.apache.beam.sdk.util.ActiveWindowSet.MergeCallback;
+import org.apache.beam.sdk.util.MergingActiveWindowSet;
+import org.apache.beam.sdk.util.NonMergingActiveWindowSet;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.Timers;
+import org.apache.beam.sdk.util.WindowTracing;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import 
org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
+import org.apache.beam.sdk.util.state.TestInMemoryStateInternals;
+import org.apache.beam.sdk.util.state.TimerCallback;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Test utility that runs a {@link TriggerStateMachine}, using in-memory stub 
implementation to
+ * provide the {@link StateInternals}.
+ *
+ * @param <W> The type of windows being used.
+ */
+public class TriggerStateMachineTester<InputT, W extends BoundedWindow> {
+
+  /**
+   * A {@link TriggerStateMachineTester} specialized to {@link Integer} 
values, so elements and
+   * timestamps can be conflated. Today, triggers should not observed the 
element type, so this is
+   * the only trigger tester that needs to be used.
+   */
+  public static class SimpleTriggerStateMachineTester<W extends BoundedWindow>
+      extends TriggerStateMachineTester<Integer, W> {
+
+    private SimpleTriggerStateMachineTester(
+        ExecutableTriggerStateMachine executableTriggerStateMachine,
+        WindowFn<Object, W> windowFn,
+        Duration allowedLateness)
+        throws Exception {
+      super(executableTriggerStateMachine, windowFn, allowedLateness);
+    }
+
+    public void injectElements(int... values) throws Exception {
+      List<TimestampedValue<Integer>> timestampedValues =
+          Lists.newArrayListWithCapacity(values.length);
+      for (int value : values) {
+        timestampedValues.add(TimestampedValue.of(value, new Instant(value)));
+      }
+      injectElements(timestampedValues);
+    }
+
+    public SimpleTriggerStateMachineTester<W> withAllowedLateness(Duration 
allowedLateness)
+        throws Exception {
+      return new SimpleTriggerStateMachineTester<>(
+              executableTrigger,
+              windowFn,
+              allowedLateness);
+    }
+  }
+
+  private final TestInMemoryStateInternals<?> stateInternals =
+      new TestInMemoryStateInternals<Object>(null /* key */);
+  private final InMemoryTimerInternals timerInternals = new 
InMemoryTimerInternals();
+  private final TriggerStateMachineContextFactory<W> contextFactory;
+  protected final WindowFn<Object, W> windowFn;
+  private final ActiveWindowSet<W> activeWindows;
+  private final Map<W, W> windowToMergeResult;
+
+  /**
+   * An {@link ExecutableTriggerStateMachine} under test.
+   */
+  protected final ExecutableTriggerStateMachine executableTrigger;
+
+  /**
+   * A map from a window and trigger to whether that trigger is finished for 
the window.
+   */
+  private final Map<W, FinishedTriggers> finishedSets;
+
+  public static <W extends BoundedWindow> SimpleTriggerStateMachineTester<W> 
forTrigger(
+      TriggerStateMachine stateMachine, WindowFn<Object, W> windowFn)
+          throws Exception {
+
+    ExecutableTriggerStateMachine executableTriggerStateMachine =
+        ExecutableTriggerStateMachine.create(stateMachine);
+
+    // Merging requires accumulation mode or early firings can break up a 
session.
+    // Not currently an issue with the tester (because we never GC) but we 
don't want
+    // mystery failures due to violating this need.
+    AccumulationMode mode =
+        windowFn.isNonMerging()
+            ? AccumulationMode.DISCARDING_FIRED_PANES
+            : AccumulationMode.ACCUMULATING_FIRED_PANES;
+
+    return new SimpleTriggerStateMachineTester<>(
+        executableTriggerStateMachine, windowFn, Duration.ZERO);
+  }
+
+  public static <InputT, W extends BoundedWindow>
+  TriggerStateMachineTester<InputT, W> forAdvancedTrigger(
+          TriggerStateMachine stateMachine, WindowFn<Object, W> windowFn) 
throws Exception {
+    ExecutableTriggerStateMachine executableTriggerStateMachine =
+        ExecutableTriggerStateMachine.create(stateMachine);
+
+    // Merging requires accumulation mode or early firings can break up a 
session.
+    // Not currently an issue with the tester (because we never GC) but we 
don't want
+    // mystery failures due to violating this need.
+    AccumulationMode mode =
+            windowFn.isNonMerging()
+                    ? AccumulationMode.DISCARDING_FIRED_PANES
+                    : AccumulationMode.ACCUMULATING_FIRED_PANES;
+
+    return new TriggerStateMachineTester<>(executableTriggerStateMachine, 
windowFn, Duration.ZERO);
+  }
+
+  protected TriggerStateMachineTester(
+                                 ExecutableTriggerStateMachine 
executableTriggerStateMachine,
+          WindowFn<Object, W> windowFn,
+                                 Duration allowedLateness) throws Exception {
+    this.windowFn = windowFn;
+    this.executableTrigger = executableTriggerStateMachine;
+    this.finishedSets = new HashMap<>();
+
+    this.activeWindows =
+        windowFn.isNonMerging()
+            ? new NonMergingActiveWindowSet<W>()
+            : new MergingActiveWindowSet<W>(windowFn, stateInternals);
+    this.windowToMergeResult = new HashMap<>();
+
+    this.contextFactory =
+        new TriggerStateMachineContextFactory<>(
+            windowFn, stateInternals, activeWindows);
+  }
+
+  /**
+   * Instructs the trigger to clear its state for the given window.
+   */
+  public void clearState(W window) throws Exception {
+    executableTrigger.invokeClear(contextFactory.base(window,
+        new TestTimers(windowNamespace(window)), executableTrigger, 
getFinishedSet(window)));
+  }
+
+  /**
+   * Asserts that the trigger has actually cleared all of its state for the 
given window. Since
+   * the trigger under test is the root, this makes the assert for all 
triggers regardless
+   * of their position in the trigger tree.
+   */
+  public void assertCleared(W window) {
+    for (StateNamespace untypedNamespace : 
stateInternals.getNamespacesInUse()) {
+      if (untypedNamespace instanceof WindowAndTriggerNamespace) {
+        @SuppressWarnings("unchecked")
+        WindowAndTriggerNamespace<W> namespace = 
(WindowAndTriggerNamespace<W>) untypedNamespace;
+        if (namespace.getWindow().equals(window)) {
+          Set<?> tagsInUse = stateInternals.getTagsInUse(namespace);
+          assertTrue("Trigger has not cleared tags: " + tagsInUse, 
tagsInUse.isEmpty());
+        }
+      }
+    }
+  }
+
+  /**
+   * Returns {@code true} if the {@link TriggerStateMachine} under test is 
finished for the given
+   * window.
+   */
+  public boolean isMarkedFinished(W window) {
+    FinishedTriggers finishedSet = finishedSets.get(window);
+    if (finishedSet == null) {
+      return false;
+    }
+
+    return finishedSet.isFinished(executableTrigger);
+  }
+
+  private StateNamespace windowNamespace(W window) {
+    return StateNamespaces.window(windowFn.windowCoder(), 
checkNotNull(window));
+  }
+
+  /**
+   * Advance the input watermark to the specified time, then advance the 
output watermark as far as
+   * possible.
+   */
+  public void advanceInputWatermark(Instant newInputWatermark) throws 
Exception {
+    // TODO: Should test timer firings: see 
https://issues.apache.org/jira/browse/BEAM-694
+    timerInternals.advanceInputWatermark(TimerCallback.NO_OP, 
newInputWatermark);
+  }
+
+  /** Advance the processing time to the specified time. */
+  public void advanceProcessingTime(Instant newProcessingTime) throws 
Exception {
+    // TODO: Should test timer firings: see 
https://issues.apache.org/jira/browse/BEAM-694
+    timerInternals.advanceProcessingTime(TimerCallback.NO_OP, 
newProcessingTime);
+  }
+
+  /**
+   * Inject all the timestamped values (after passing through the window 
function) as if they
+   * arrived in a single chunk of a bundle (or work-unit).
+   */
+  @SafeVarargs
+  public final void injectElements(TimestampedValue<InputT>... values) throws 
Exception {
+    injectElements(Arrays.asList(values));
+  }
+
+  public final void injectElements(Collection<TimestampedValue<InputT>> 
values) throws Exception {
+    for (TimestampedValue<InputT> value : values) {
+      WindowTracing.trace("TriggerTester.injectElements: {}", value);
+    }
+
+    List<WindowedValue<InputT>> windowedValues = 
Lists.newArrayListWithCapacity(values.size());
+
+    for (TimestampedValue<InputT> input : values) {
+      try {
+        InputT value = input.getValue();
+        Instant timestamp = input.getTimestamp();
+        Collection<W> assignedWindows = windowFn.assignWindows(new 
TestAssignContext<W>(
+            windowFn, value, timestamp, GlobalWindow.INSTANCE));
+
+        for (W window : assignedWindows) {
+          activeWindows.addActiveForTesting(window);
+
+          // Today, triggers assume onTimer firing at the watermark time, 
whether or not they
+          // explicitly set the timer themselves. So this tester must set it.
+          timerInternals.setTimer(
+              TimerData.of(windowNamespace(window), window.maxTimestamp(), 
TimeDomain.EVENT_TIME));
+        }
+
+        windowedValues.add(WindowedValue.of(value, timestamp, assignedWindows, 
PaneInfo.NO_FIRING));
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    for (WindowedValue<InputT> windowedValue : windowedValues) {
+      for (BoundedWindow untypedWindow : windowedValue.getWindows()) {
+        // SDK is responsible for type safety
+        @SuppressWarnings("unchecked")
+        W window = mergeResult((W) untypedWindow);
+
+        TriggerStateMachine.OnElementContext context = 
contextFactory.createOnElementContext(window,
+            new TestTimers(windowNamespace(window)), 
windowedValue.getTimestamp(),
+            executableTrigger, getFinishedSet(window));
+
+        if (!context.trigger().isFinished()) {
+          executableTrigger.invokeOnElement(context);
+        }
+      }
+    }
+  }
+
+  public boolean shouldFire(W window) throws Exception {
+    TriggerStateMachine.TriggerContext context = contextFactory.base(
+        window,
+        new TestTimers(windowNamespace(window)),
+        executableTrigger, getFinishedSet(window));
+    executableTrigger.getSpec().prefetchShouldFire(context.state());
+    return executableTrigger.invokeShouldFire(context);
+  }
+
+  public void fireIfShouldFire(W window) throws Exception {
+    TriggerStateMachine.TriggerContext context = contextFactory.base(
+        window,
+        new TestTimers(windowNamespace(window)),
+        executableTrigger, getFinishedSet(window));
+
+    executableTrigger.getSpec().prefetchShouldFire(context.state());
+    if (executableTrigger.invokeShouldFire(context)) {
+      executableTrigger.getSpec().prefetchOnFire(context.state());
+      executableTrigger.invokeOnFire(context);
+      if (context.trigger().isFinished()) {
+        activeWindows.remove(window);
+        executableTrigger.invokeClear(context);
+      }
+    }
+  }
+
+  public void setSubTriggerFinishedForWindow(int subTriggerIndex, W window, 
boolean value) {
+    
getFinishedSet(window).setFinished(executableTrigger.subTriggers().get(subTriggerIndex),
 value);
+  }
+
+  /**
+   * Invokes merge from the {@link WindowFn} a single time and passes the 
resulting merge
+   * events on to the trigger under test. Does not persist the fact that 
merging happened,
+   * since it is just to test the trigger's {@code OnMerge} method.
+   */
+  public final void mergeWindows() throws Exception {
+    windowToMergeResult.clear();
+    activeWindows.merge(new MergeCallback<W>() {
+      @Override
+      public void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) 
throws Exception {}
+
+      @Override
+      public void onMerge(Collection<W> toBeMerged, W mergeResult) throws 
Exception {
+        List<W> activeToBeMerged = new ArrayList<W>();
+        for (W window : toBeMerged) {
+          windowToMergeResult.put(window, mergeResult);
+          if (activeWindows.isActive(window)) {
+            activeToBeMerged.add(window);
+          }
+        }
+        Map<W, FinishedTriggers> mergingFinishedSets =
+            Maps.newHashMapWithExpectedSize(activeToBeMerged.size());
+        for (W oldWindow : activeToBeMerged) {
+          mergingFinishedSets.put(oldWindow, getFinishedSet(oldWindow));
+        }
+        
executableTrigger.invokeOnMerge(contextFactory.createOnMergeContext(mergeResult,
+            new TestTimers(windowNamespace(mergeResult)), executableTrigger,
+            getFinishedSet(mergeResult), mergingFinishedSets));
+        timerInternals.setTimer(TimerData.of(
+            windowNamespace(mergeResult), mergeResult.maxTimestamp(), 
TimeDomain.EVENT_TIME));
+      }
+    });
+  }
+
+  public  W mergeResult(W window) {
+    W result = windowToMergeResult.get(window);
+    return result == null ? window : result;
+  }
+
+  private FinishedTriggers getFinishedSet(W window) {
+    FinishedTriggers finishedSet = finishedSets.get(window);
+    if (finishedSet == null) {
+      finishedSet = FinishedTriggersSet.fromSet(new 
HashSet<ExecutableTriggerStateMachine>());
+      finishedSets.put(window, finishedSet);
+    }
+    return finishedSet;
+  }
+
+  private static class TestAssignContext<W extends BoundedWindow>
+      extends WindowFn<Object, W>.AssignContext {
+    private Object element;
+    private Instant timestamp;
+    private BoundedWindow window;
+
+    public TestAssignContext(
+        WindowFn<Object, W> windowFn, Object element, Instant timestamp, 
BoundedWindow window) {
+      windowFn.super();
+      this.element = element;
+      this.timestamp = timestamp;
+      this.window = window;
+    }
+
+    @Override
+    public Object element() {
+      return element;
+    }
+
+    @Override
+    public Instant timestamp() {
+      return timestamp;
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return window;
+    }
+  }
+
+  private class TestTimers implements Timers {
+    private final StateNamespace namespace;
+
+    public TestTimers(StateNamespace namespace) {
+      checkArgument(namespace instanceof WindowNamespace);
+      this.namespace = namespace;
+    }
+
+    @Override
+    public void setTimer(Instant timestamp, TimeDomain timeDomain) {
+      timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain));
+    }
+
+    @Override
+    public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
+      timerInternals.deleteTimer(TimerData.of(namespace, timestamp, 
timeDomain));
+    }
+
+    @Override
+    public Instant currentProcessingTime() {
+      return timerInternals.currentProcessingTime();
+    }
+
+    @Override
+    @Nullable
+    public Instant currentSynchronizedProcessingTime() {
+      return timerInternals.currentSynchronizedProcessingTime();
+    }
+
+    @Override
+    public Instant currentEventTime() {
+      return timerInternals.currentInputWatermarkTime();
+    }
+  }
+}

Reply via email to