http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java
new file mode 100644
index 0000000..7e6e938
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests the {@link AfterSynchronizedProcessingTime}.
+ */
+@RunWith(JUnit4.class)
+public class AfterSynchronizedProcessingTimeTest {
+
+  private Trigger underTest = new AfterSynchronizedProcessingTime();
+
+  @Test
+  public void testAfterProcessingTimeWithFixedWindows() throws Exception {
+    Duration windowDuration = Duration.millis(10);
+    SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
+        AfterProcessingTime
+            .pastFirstElementInPane()
+            .plusDelayOf(Duration.millis(5)),
+        FixedWindows.of(windowDuration));
+
+    tester.advanceProcessingTime(new Instant(10));
+
+    // Timer at 15
+    tester.injectElements(1);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new 
Instant(10));
+    tester.advanceProcessingTime(new Instant(12));
+    assertFalse(tester.shouldFire(firstWindow));
+
+    // Load up elements in the next window, timer at 17 for them
+    tester.injectElements(11, 12, 13);
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new 
Instant(20));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Not quite time to fire
+    tester.advanceProcessingTime(new Instant(14));
+    assertFalse(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Timer at 19 for these in the first window; it should be ignored since 
the 15 will fire first
+    tester.injectElements(2, 3);
+
+    // Advance past the first timer and fire, finishing the first window
+    tester.advanceProcessingTime(new Instant(16));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+    assertTrue(tester.isMarkedFinished(firstWindow));
+
+    // The next window fires and finishes now
+    tester.advanceProcessingTime(new Instant(18));
+    assertTrue(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(secondWindow);
+    assertTrue(tester.isMarkedFinished(secondWindow));
+  }
+
+  @Test
+  public void testAfterProcessingTimeWithMergingWindow() throws Exception {
+    Duration windowDuration = Duration.millis(10);
+    SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
+        AfterProcessingTime
+            .pastFirstElementInPane()
+            .plusDelayOf(Duration.millis(5)),
+        Sessions.withGapDuration(windowDuration));
+
+    tester.advanceProcessingTime(new Instant(10));
+    tester.injectElements(1); // in [1, 11), timer for 15
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    assertFalse(tester.shouldFire(firstWindow));
+
+    tester.advanceProcessingTime(new Instant(12));
+    tester.injectElements(3); // in [3, 13), timer for 17
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new 
Instant(13));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    tester.mergeWindows();
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(13));
+
+    tester.advanceProcessingTime(new Instant(16));
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  @Test
+  public void testFireDeadline() throws Exception {
+    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
+        underTest.getWatermarkThatGuaranteesFiring(
+            new IntervalWindow(new Instant(0), new Instant(10))));
+  }
+
+  @Test
+  public void testContinuation() throws Exception {
+    assertEquals(underTest, underTest.getContinuationTrigger());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java
new file mode 100644
index 0000000..084027b
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests the {@link AfterWatermark} triggers.
+ */
+@RunWith(JUnit4.class)
+public class AfterWatermarkTest {
+
+  @Mock private OnceTrigger mockEarly;
+  @Mock private OnceTrigger mockLate;
+
+  private SimpleTriggerTester<IntervalWindow> tester;
+  private static Trigger.TriggerContext anyTriggerContext() {
+    return Mockito.<Trigger.TriggerContext>any();
+  }
+  private static Trigger.OnElementContext anyElementContext() {
+    return Mockito.<Trigger.OnElementContext>any();
+  }
+
+  private void injectElements(int... elements) throws Exception {
+    for (int element : elements) {
+      doNothing().when(mockEarly).onElement(anyElementContext());
+      doNothing().when(mockLate).onElement(anyElementContext());
+      tester.injectElements(element);
+    }
+  }
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  public void testRunningAsTrigger(OnceTrigger mockTrigger, IntervalWindow 
window)
+      throws Exception {
+
+    // Don't fire due to mock saying no
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+    assertFalse(tester.shouldFire(window)); // not ready
+
+    // Fire due to mock trigger; early trigger is required to be a OnceTrigger
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    assertTrue(tester.shouldFire(window)); // ready
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+  }
+
+  @Test
+  public void testEarlyAndAtWatermark() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterWatermark.pastEndOfWindow()
+            .withEarlyFirings(mockEarly),
+        FixedWindows.of(Duration.millis(100)));
+
+    injectElements(1);
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
+
+    testRunningAsTrigger(mockEarly, window);
+
+    // Fire due to watermark
+    when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
+    tester.advanceInputWatermark(new Instant(100));
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertTrue(tester.isMarkedFinished(window));
+  }
+
+  @Test
+  public void testAtWatermarkAndLate() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterWatermark.pastEndOfWindow()
+            .withLateFirings(mockLate),
+        FixedWindows.of(Duration.millis(100)));
+
+    injectElements(1);
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
+
+    // No early firing, just double checking
+    when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(true);
+    assertFalse(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+
+    // Fire due to watermark
+    when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
+    tester.advanceInputWatermark(new Instant(100));
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+
+    testRunningAsTrigger(mockLate, window);
+  }
+
+  @Test
+  public void testEarlyAndAtWatermarkAndLate() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterWatermark.pastEndOfWindow()
+            .withEarlyFirings(mockEarly)
+            .withLateFirings(mockLate),
+        FixedWindows.of(Duration.millis(100)));
+
+    injectElements(1);
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
+
+    testRunningAsTrigger(mockEarly, window);
+
+    // Fire due to watermark
+    when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
+    tester.advanceInputWatermark(new Instant(100));
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+
+    testRunningAsTrigger(mockLate, window);
+  }
+
+  /**
+   * Tests that if the EOW is finished in both as well as the merged window, 
then
+   * it is finished in the merged result.
+   *
+   * <p>Because windows are discarded when a trigger finishes, we need to 
embed this
+   * in a sequence in order to check that it is re-activated. So this test is 
potentially
+   * sensitive to other triggers' correctness.
+   */
+  @Test
+  public void testOnMergeAlreadyFinished() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterEach.inOrder(
+            AfterWatermark.pastEndOfWindow(),
+            Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    tester.injectElements(1);
+    tester.injectElements(5);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
+
+    // Finish the AfterWatermark.pastEndOfWindow() trigger in both windows
+    tester.advanceInputWatermark(new Instant(15));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertTrue(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+    tester.fireIfShouldFire(secondWindow);
+
+    // Confirm that we are on the second trigger by probing
+    assertFalse(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    tester.injectElements(1);
+    tester.injectElements(5);
+    assertTrue(tester.shouldFire(firstWindow));
+    assertTrue(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+    tester.fireIfShouldFire(secondWindow);
+
+    // Merging should leave it finished
+    tester.mergeWindows();
+
+    // Confirm that we are on the second trigger by probing
+    assertFalse(tester.shouldFire(mergedWindow));
+    tester.injectElements(1);
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  /**
+   * Tests that the trigger rewinds to be non-finished in the merged window.
+   *
+   * <p>Because windows are discarded when a trigger finishes, we need to 
embed this
+   * in a sequence in order to check that it is re-activated. So this test is 
potentially
+   * sensitive to other triggers' correctness.
+   */
+  @Test
+  public void testOnMergeRewinds() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterEach.inOrder(
+            AfterWatermark.pastEndOfWindow(),
+            Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    tester.injectElements(1);
+    tester.injectElements(5);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
+
+    // Finish the AfterWatermark.pastEndOfWindow() trigger in only the first 
window
+    tester.advanceInputWatermark(new Instant(11));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+
+    // Confirm that we are on the second trigger by probing
+    assertFalse(tester.shouldFire(firstWindow));
+    tester.injectElements(1);
+    assertTrue(tester.shouldFire(firstWindow));
+    tester.fireIfShouldFire(firstWindow);
+
+    // Merging should re-activate the watermark trigger in the merged window
+    tester.mergeWindows();
+
+    // Confirm that we are not on the second trigger by probing
+    assertFalse(tester.shouldFire(mergedWindow));
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(mergedWindow));
+
+    // And confirm that advancing the watermark fires again
+    tester.advanceInputWatermark(new Instant(15));
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  /**
+   * Tests that if the EOW is finished in both as well as the merged window, 
then
+   * it is finished in the merged result.
+   *
+   * <p>Because windows are discarded when a trigger finishes, we need to 
embed this
+   * in a sequence in order to check that it is re-activated. So this test is 
potentially
+   * sensitive to other triggers' correctness.
+   */
+  @Test
+  public void testEarlyAndLateOnMergeAlreadyFinished() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterWatermark.pastEndOfWindow()
+            .withEarlyFirings(AfterPane.elementCountAtLeast(100))
+            .withLateFirings(AfterPane.elementCountAtLeast(1)),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    tester.injectElements(1);
+    tester.injectElements(5);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
+
+    // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in both 
windows
+    tester.advanceInputWatermark(new Instant(15));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertTrue(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+    tester.fireIfShouldFire(secondWindow);
+
+    // Confirm that we are on the late trigger by probing
+    assertFalse(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    tester.injectElements(1);
+    tester.injectElements(5);
+    assertTrue(tester.shouldFire(firstWindow));
+    assertTrue(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+    tester.fireIfShouldFire(secondWindow);
+
+    // Merging should leave it on the late trigger
+    tester.mergeWindows();
+
+    // Confirm that we are on the late trigger by probing
+    assertFalse(tester.shouldFire(mergedWindow));
+    tester.injectElements(1);
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  /**
+   * Tests that the trigger rewinds to be non-finished in the merged window.
+   *
+   * <p>Because windows are discarded when a trigger finishes, we need to 
embed this
+   * in a sequence in order to check that it is re-activated. So this test is 
potentially
+   * sensitive to other triggers' correctness.
+   */
+  @Test
+  public void testEarlyAndLateOnMergeRewinds() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterWatermark.pastEndOfWindow()
+            .withEarlyFirings(AfterPane.elementCountAtLeast(100))
+            .withLateFirings(AfterPane.elementCountAtLeast(1)),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    tester.injectElements(1);
+    tester.injectElements(5);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
+
+    // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in only 
the first window
+    tester.advanceInputWatermark(new Instant(11));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+
+    // Confirm that we are on the late trigger by probing
+    assertFalse(tester.shouldFire(firstWindow));
+    tester.injectElements(1);
+    assertTrue(tester.shouldFire(firstWindow));
+    tester.fireIfShouldFire(firstWindow);
+
+    // Merging should re-activate the early trigger in the merged window
+    tester.mergeWindows();
+
+    // Confirm that we are not on the second trigger by probing
+    assertFalse(tester.shouldFire(mergedWindow));
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(mergedWindow));
+
+    // And confirm that advancing the watermark fires again
+    tester.advanceInputWatermark(new Instant(15));
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  @Test
+  public void testFromEndOfWindowToString() {
+    Trigger trigger = AfterWatermark.pastEndOfWindow();
+    assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString());
+  }
+
+  @Test
+  public void testEarlyFiringsToString() {
+    Trigger trigger = 
AfterWatermark.pastEndOfWindow().withEarlyFirings(StubTrigger.named("t1"));
+
+    assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1)", 
trigger.toString());
+  }
+
+  @Test
+  public void testLateFiringsToString() {
+    Trigger trigger = 
AfterWatermark.pastEndOfWindow().withLateFirings(StubTrigger.named("t1"));
+
+    assertEquals("AfterWatermark.pastEndOfWindow().withLateFirings(t1)", 
trigger.toString());
+  }
+
+  @Test
+  public void testEarlyAndLateFiringsToString() {
+    Trigger trigger =
+        AfterWatermark.pastEndOfWindow()
+            .withEarlyFirings(StubTrigger.named("t1"))
+            .withLateFirings(StubTrigger.named("t2"));
+
+    
assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1).withLateFirings(t2)",
+        trigger.toString());
+  }
+
+  @Test
+  public void testToStringExcludesNeverTrigger() {
+    Trigger trigger =
+        AfterWatermark.pastEndOfWindow()
+            .withEarlyFirings(Never.ever())
+            .withLateFirings(Never.ever());
+
+    assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java
new file mode 100644
index 0000000..673e555
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests the {@link DefaultTrigger}, which should be equivalent to
+ * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
+ */
+@RunWith(JUnit4.class)
+public class DefaultTriggerTest {
+
+  SimpleTriggerTester<IntervalWindow> tester;
+
+  @Test
+  public void testDefaultTriggerFixedWindows() throws Exception {
+    tester = TriggerTester.forTrigger(
+        DefaultTrigger.of(),
+        FixedWindows.of(Duration.millis(100)));
+
+    tester.injectElements(
+        1, // [0, 100)
+        101); // [100, 200)
+
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new 
Instant(100));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(100), new 
Instant(200));
+
+    // Advance the watermark almost to the end of the first window.
+    tester.advanceInputWatermark(new Instant(99));
+    assertFalse(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Advance watermark past end of the first window, which is then ready
+    tester.advanceInputWatermark(new Instant(100));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Fire, but the first window is still allowed to fire
+    tester.fireIfShouldFire(firstWindow);
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Advance watermark to 200, then both are ready
+    tester.advanceInputWatermark(new Instant(200));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertTrue(tester.shouldFire(secondWindow));
+
+    assertFalse(tester.isMarkedFinished(firstWindow));
+    assertFalse(tester.isMarkedFinished(secondWindow));
+  }
+
+  @Test
+  public void testDefaultTriggerSlidingWindows() throws Exception {
+    tester = TriggerTester.forTrigger(
+        DefaultTrigger.of(),
+        SlidingWindows.of(Duration.millis(100)).every(Duration.millis(50)));
+
+    tester.injectElements(
+        1, // [-50, 50), [0, 100)
+        50); // [0, 100), [50, 150)
+
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(-50), new 
Instant(50));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new 
Instant(100));
+    IntervalWindow thirdWindow = new IntervalWindow(new Instant(50), new 
Instant(150));
+
+    assertFalse(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    assertFalse(tester.shouldFire(thirdWindow));
+
+    // At 50, the first becomes ready; it stays ready after firing
+    tester.advanceInputWatermark(new Instant(50));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    assertFalse(tester.shouldFire(thirdWindow));
+    tester.fireIfShouldFire(firstWindow);
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    assertFalse(tester.shouldFire(thirdWindow));
+
+    // At 99, the first is still the only one ready
+    tester.advanceInputWatermark(new Instant(99));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    assertFalse(tester.shouldFire(thirdWindow));
+
+    // At 100, the first and second are ready
+    tester.advanceInputWatermark(new Instant(100));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertTrue(tester.shouldFire(secondWindow));
+    assertFalse(tester.shouldFire(thirdWindow));
+    tester.fireIfShouldFire(firstWindow);
+
+    assertFalse(tester.isMarkedFinished(firstWindow));
+    assertFalse(tester.isMarkedFinished(secondWindow));
+    assertFalse(tester.isMarkedFinished(thirdWindow));
+  }
+
+  @Test
+  public void testDefaultTriggerSessions() throws Exception {
+    tester = TriggerTester.forTrigger(
+        DefaultTrigger.of(),
+        Sessions.withGapDuration(Duration.millis(100)));
+
+    tester.injectElements(
+        1, // [1, 101)
+        50); // [50, 150)
+    tester.mergeWindows();
+
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(101));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(50), new 
Instant(150));
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(150));
+
+    // Not ready in any window yet
+    tester.advanceInputWatermark(new Instant(100));
+    assertFalse(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    assertFalse(tester.shouldFire(mergedWindow));
+
+    // The first window is "ready": the caller owns knowledge of which windows 
are merged away
+    tester.advanceInputWatermark(new Instant(149));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    assertFalse(tester.shouldFire(mergedWindow));
+
+    // Now ready on all windows
+    tester.advanceInputWatermark(new Instant(150));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertTrue(tester.shouldFire(secondWindow));
+    assertTrue(tester.shouldFire(mergedWindow));
+
+    // Ensure it repeats
+    tester.fireIfShouldFire(mergedWindow);
+    assertTrue(tester.shouldFire(mergedWindow));
+
+    assertFalse(tester.isMarkedFinished(mergedWindow));
+  }
+
+  @Test
+  public void testFireDeadline() throws Exception {
+    assertEquals(new Instant(9), 
DefaultTrigger.of().getWatermarkThatGuaranteesFiring(
+        new IntervalWindow(new Instant(0), new Instant(10))));
+    assertEquals(GlobalWindow.INSTANCE.maxTimestamp(),
+        
DefaultTrigger.of().getWatermarkThatGuaranteesFiring(GlobalWindow.INSTANCE));
+  }
+
+  @Test
+  public void testContinuation() throws Exception {
+    assertEquals(DefaultTrigger.of(), 
DefaultTrigger.of().getContinuationTrigger());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
new file mode 100644
index 0000000..fb2b4d5
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link Never}.
+ */
+@RunWith(JUnit4.class)
+public class NeverTest {
+  private SimpleTriggerTester<IntervalWindow> triggerTester;
+
+  @Before
+  public void setup() throws Exception {
+    triggerTester =
+        TriggerTester.forTrigger(
+            Never.ever(), FixedWindows.of(Duration.standardMinutes(5)));
+  }
+
+  @Test
+  public void falseAfterEndOfWindow() throws Exception {
+    triggerTester.injectElements(TimestampedValue.of(1, new Instant(1)));
+    IntervalWindow window =
+        new IntervalWindow(new Instant(0), new 
Instant(0).plus(Duration.standardMinutes(5)));
+    assertThat(triggerTester.shouldFire(window), is(false));
+    triggerTester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    assertThat(triggerTester.shouldFire(window), is(false));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java
new file mode 100644
index 0000000..7289d97
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link OrFinallyTrigger}.
+ */
+@RunWith(JUnit4.class)
+public class OrFinallyTriggerTest {
+
+  private SimpleTriggerTester<IntervalWindow> tester;
+
+  /**
+   * Tests that for {@code OrFinally(actual, ...)} when {@code actual}
+   * fires and finishes, the {@code OrFinally} also fires and finishes.
+   */
+  @Test
+  public void testActualFiresAndFinishes() throws Exception {
+    tester = TriggerTester.forTrigger(
+        new OrFinallyTrigger(
+            AfterPane.elementCountAtLeast(2),
+            AfterPane.elementCountAtLeast(100)),
+        FixedWindows.of(Duration.millis(100)));
+
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
+
+    // Not yet firing
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+    assertFalse(tester.isMarkedFinished(window));
+
+    // The actual fires and finishes
+    tester.injectElements(2);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertTrue(tester.isMarkedFinished(window));
+  }
+
+  /**
+   * Tests that for {@code OrFinally(actual, ...)} when {@code actual}
+   * fires but does not finish, the {@code OrFinally} also fires and also does 
not
+   * finish.
+   */
+  @Test
+  public void testActualFiresOnly() throws Exception {
+    tester = TriggerTester.forTrigger(
+        new OrFinallyTrigger(
+            Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
+            AfterPane.elementCountAtLeast(100)),
+        FixedWindows.of(Duration.millis(100)));
+
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
+
+    // Not yet firing
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+    assertFalse(tester.isMarkedFinished(window));
+
+    // The actual fires but does not finish
+    tester.injectElements(2);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+
+    // And again
+    tester.injectElements(3, 4);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+  }
+
+  /**
+   * Tests that if the first trigger rewinds to be non-finished in the merged 
window,
+   * then it becomes the currently active trigger again, with real triggers.
+   */
+  @Test
+  public void testShouldFireAfterMerge() throws Exception {
+    tester = TriggerTester.forTrigger(
+        AfterEach.inOrder(
+            AfterPane.elementCountAtLeast(5)
+                .orFinally(AfterWatermark.pastEndOfWindow()),
+            Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    // Finished the orFinally in the first window
+    tester.injectElements(1);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    assertFalse(tester.shouldFire(firstWindow));
+    tester.advanceInputWatermark(new Instant(11));
+    assertTrue(tester.shouldFire(firstWindow));
+    tester.fireIfShouldFire(firstWindow);
+
+    // Set up second window where it is not done
+    tester.injectElements(5);
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Merge them, if the merged window were on the second trigger, it would 
be ready
+    tester.mergeWindows();
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
+    assertFalse(tester.shouldFire(mergedWindow));
+
+    // Now adding 3 more makes the main trigger ready to fire
+    tester.injectElements(1, 2, 3, 4, 5);
+    tester.mergeWindows();
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  /**
+   * Tests that for {@code OrFinally(actual, until)} when {@code actual}
+   * fires but does not finish, then {@code until} fires and finishes, the
+   * whole thing fires and finished.
+   */
+  @Test
+  public void testActualFiresButUntilFinishes() throws Exception {
+    tester = TriggerTester.forTrigger(
+        new OrFinallyTrigger(
+            Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
+                AfterPane.elementCountAtLeast(3)),
+        FixedWindows.of(Duration.millis(10)));
+
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(10));
+
+    // Before any firing
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+    assertFalse(tester.isMarkedFinished(window));
+
+    // The actual fires but doesn't finish
+    tester.injectElements(2);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+
+    // The until fires and finishes; the trigger is finished
+    tester.injectElements(3);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertTrue(tester.isMarkedFinished(window));
+  }
+
+  @Test
+  public void testFireDeadline() throws Exception {
+    BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+
+    assertEquals(new Instant(9),
+        Repeatedly.forever(AfterWatermark.pastEndOfWindow())
+        .getWatermarkThatGuaranteesFiring(window));
+    assertEquals(new Instant(9), 
Repeatedly.forever(AfterWatermark.pastEndOfWindow())
+        .orFinally(AfterPane.elementCountAtLeast(1))
+        .getWatermarkThatGuaranteesFiring(window));
+    assertEquals(new Instant(9), 
Repeatedly.forever(AfterPane.elementCountAtLeast(1))
+        .orFinally(AfterWatermark.pastEndOfWindow())
+        .getWatermarkThatGuaranteesFiring(window));
+    assertEquals(new Instant(9),
+        AfterPane.elementCountAtLeast(100)
+            .orFinally(AfterWatermark.pastEndOfWindow())
+            .getWatermarkThatGuaranteesFiring(window));
+
+    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
+        Repeatedly.forever(AfterPane.elementCountAtLeast(1))
+        .orFinally(AfterPane.elementCountAtLeast(10))
+        .getWatermarkThatGuaranteesFiring(window));
+  }
+
+  @Test
+  public void testContinuation() throws Exception {
+    OnceTrigger triggerA = AfterProcessingTime.pastFirstElementInPane();
+    OnceTrigger triggerB = AfterWatermark.pastEndOfWindow();
+    Trigger aOrFinallyB = triggerA.orFinally(triggerB);
+    Trigger bOrFinallyA = triggerB.orFinally(triggerA);
+    assertEquals(
+        Repeatedly.forever(
+            
triggerA.getContinuationTrigger().orFinally(triggerB.getContinuationTrigger())),
+        aOrFinallyB.getContinuationTrigger());
+    assertEquals(
+        Repeatedly.forever(
+            
triggerB.getContinuationTrigger().orFinally(triggerA.getContinuationTrigger())),
+        bOrFinallyA.getContinuationTrigger());
+  }
+
+  @Test
+  public void testToString() {
+    Trigger trigger = 
StubTrigger.named("t1").orFinally(StubTrigger.named("t2"));
+    assertEquals("t1.orFinally(t2)", trigger.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java
new file mode 100644
index 0000000..6e8930d
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link Repeatedly}.
+ */
+@RunWith(JUnit4.class)
+public class RepeatedlyTest {
+
+  @Mock private Trigger mockTrigger;
+  private SimpleTriggerTester<IntervalWindow> tester;
+  private static Trigger.TriggerContext anyTriggerContext() {
+    return Mockito.<Trigger.TriggerContext>any();
+  }
+
+  public void setUp(WindowFn<Object, IntervalWindow> windowFn) throws 
Exception {
+    MockitoAnnotations.initMocks(this);
+    tester = TriggerTester.forTrigger(Repeatedly.forever(mockTrigger), 
windowFn);
+  }
+
+  /**
+   * Tests that onElement correctly passes the data on to the subtrigger.
+   */
+  @Test
+  public void testOnElement() throws Exception {
+    setUp(FixedWindows.of(Duration.millis(10)));
+    tester.injectElements(37);
+    verify(mockTrigger).onElement(Mockito.<Trigger.OnElementContext>any());
+  }
+
+  /**
+   * Tests that the repeatedly is ready to fire whenever the subtrigger is 
ready.
+   */
+  @Test
+  public void testShouldFire() throws Exception {
+    setUp(FixedWindows.of(Duration.millis(10)));
+
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    assertTrue(tester.shouldFire(new IntervalWindow(new Instant(0), new 
Instant(10))));
+
+    when(mockTrigger.shouldFire(Mockito.<Trigger.TriggerContext>any()))
+        .thenReturn(false);
+    assertFalse(tester.shouldFire(new IntervalWindow(new Instant(0), new 
Instant(10))));
+  }
+
+  /**
+   * Tests that the watermark that guarantees firing is that of the subtrigger.
+   */
+  @Test
+  public void testFireDeadline() throws Exception {
+    setUp(FixedWindows.of(Duration.millis(10)));
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(10));
+    Instant arbitraryInstant = new Instant(34957849);
+
+    
when(mockTrigger.getWatermarkThatGuaranteesFiring(Mockito.<IntervalWindow>any()))
+        .thenReturn(arbitraryInstant);
+
+    assertThat(
+        
Repeatedly.forever(mockTrigger).getWatermarkThatGuaranteesFiring(window),
+        equalTo(arbitraryInstant));
+  }
+
+  @Test
+  public void testContinuation() throws Exception {
+    Trigger trigger = AfterProcessingTime.pastFirstElementInPane();
+    Trigger repeatedly = Repeatedly.forever(trigger);
+    assertEquals(
+        Repeatedly.forever(trigger.getContinuationTrigger()), 
repeatedly.getContinuationTrigger());
+    assertEquals(
+        
Repeatedly.forever(trigger.getContinuationTrigger().getContinuationTrigger()),
+        repeatedly.getContinuationTrigger().getContinuationTrigger());
+  }
+
+  @Test
+  public void testShouldFireAfterMerge() throws Exception {
+    tester = TriggerTester.forTrigger(
+        Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    tester.injectElements(1);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    assertFalse(tester.shouldFire(firstWindow));
+
+    tester.injectElements(5);
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Merge them, if the merged window were on the second trigger, it would 
be ready
+    tester.mergeWindows();
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  @Test
+  public void testRepeatedlyAfterFirstElementCount() throws Exception {
+    SimpleTriggerTester<GlobalWindow> tester =
+        TriggerTester.forTrigger(
+            Repeatedly.forever(
+                AfterFirst.of(
+                    AfterProcessingTime.pastFirstElementInPane()
+                        .plusDelayOf(Duration.standardMinutes(15)),
+                    AfterPane.elementCountAtLeast(5))),
+            new GlobalWindows());
+
+    GlobalWindow window = GlobalWindow.INSTANCE;
+
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+
+    tester.injectElements(2, 3, 4, 5);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.shouldFire(window));
+  }
+
+  @Test
+  public void testRepeatedlyAfterFirstProcessingTime() throws Exception {
+    SimpleTriggerTester<GlobalWindow> tester =
+        TriggerTester.forTrigger(
+            Repeatedly.forever(
+                AfterFirst.of(
+                    AfterProcessingTime.pastFirstElementInPane()
+                        .plusDelayOf(Duration.standardMinutes(15)),
+                    AfterPane.elementCountAtLeast(5))),
+            new GlobalWindows());
+
+    GlobalWindow window = GlobalWindow.INSTANCE;
+
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+
+    tester.advanceProcessingTime(new 
Instant(0).plus(Duration.standardMinutes(15)));
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.shouldFire(window));
+  }
+
+  @Test
+  public void testRepeatedlyElementCount() throws Exception {
+    SimpleTriggerTester<GlobalWindow> tester =
+        TriggerTester.forTrigger(
+            Repeatedly.forever(AfterPane.elementCountAtLeast(5)),
+            new GlobalWindows());
+
+    GlobalWindow window = GlobalWindow.INSTANCE;
+
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+
+    tester.injectElements(2, 3, 4, 5);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.shouldFire(window));
+  }
+
+  @Test
+  public void testRepeatedlyProcessingTime() throws Exception {
+    SimpleTriggerTester<GlobalWindow> tester =
+        TriggerTester.forTrigger(
+            Repeatedly.forever(
+                    AfterProcessingTime.pastFirstElementInPane()
+                        .plusDelayOf(Duration.standardMinutes(15))),
+            new GlobalWindows());
+
+    GlobalWindow window = GlobalWindow.INSTANCE;
+
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+
+    tester.advanceProcessingTime(new 
Instant(0).plus(Duration.standardMinutes(15)));
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.shouldFire(window));
+  }
+
+
+  @Test
+  public void testToString() {
+    Trigger trigger = Repeatedly.forever(new StubTrigger() {
+        @Override
+        public String toString() {
+          return "innerTrigger";
+        }
+      });
+
+    assertEquals("Repeatedly.forever(innerTrigger)", trigger.toString());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
new file mode 100644
index 0000000..b258a79
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.joda.time.Instant;
+
+/**
+ * No-op {@link OnceTrigger} implementation for testing.
+ */
+abstract class StubTrigger extends Trigger.OnceTrigger {
+  /**
+   * Create a stub {@link Trigger} instance which returns the specified name 
on {@link #toString()}.
+   */
+  static StubTrigger named(final String name) {
+    return new StubTrigger() {
+      @Override
+      public String toString() {
+        return name;
+      }
+    };
+  }
+
+  protected StubTrigger() {
+    super(Lists.<Trigger>newArrayList());
+  }
+
+  @Override
+  protected void onOnlyFiring(TriggerContext context) throws Exception {
+  }
+
+  @Override
+  public void onElement(OnElementContext c) throws Exception {
+  }
+
+  @Override
+  public void onMerge(OnMergeContext c) throws Exception {
+  }
+
+  @Override
+  public boolean shouldFire(TriggerContext context) throws Exception {
+    return false;
+  }
+
+  @Override
+  protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) 
{
+    return null;
+  }
+
+  @Override
+  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
new file mode 100644
index 0000000..cfc03b2
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.List;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link Trigger}.
+ */
+@RunWith(JUnit4.class)
+public class TriggerTest {
+
+  @Test
+  public void testTriggerToString() throws Exception {
+    assertEquals("AfterWatermark.pastEndOfWindow()", 
AfterWatermark.pastEndOfWindow().toString());
+    assertEquals("Repeatedly.forever(AfterWatermark.pastEndOfWindow())",
+        Repeatedly.forever(AfterWatermark.pastEndOfWindow()).toString());
+  }
+
+  @Test
+  public void testIsCompatible() throws Exception {
+    assertTrue(new Trigger1(null).isCompatible(new Trigger1(null)));
+    assertTrue(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null)))
+        .isCompatible(new Trigger1(Arrays.<Trigger>asList(new 
Trigger2(null)))));
+
+    assertFalse(new Trigger1(null).isCompatible(new Trigger2(null)));
+    assertFalse(new Trigger1(Arrays.<Trigger>asList(new Trigger1(null)))
+        .isCompatible(new Trigger1(Arrays.<Trigger>asList(new 
Trigger2(null)))));
+  }
+
+  private static class Trigger1 extends Trigger {
+
+    private Trigger1(List<Trigger> subTriggers) {
+      super(subTriggers);
+    }
+
+    @Override
+    public void onElement(Trigger.OnElementContext c) { }
+
+    @Override
+    public void onMerge(Trigger.OnMergeContext c) { }
+
+    @Override
+    protected Trigger getContinuationTrigger(
+        List<Trigger> continuationTriggers) {
+      return null;
+    }
+
+    @Override
+    public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+      return null;
+    }
+
+    @Override
+    public boolean shouldFire(Trigger.TriggerContext context) throws Exception 
{
+      return false;
+    }
+
+    @Override
+    public void onFire(Trigger.TriggerContext context) throws Exception { }
+  }
+
+  private static class Trigger2 extends Trigger {
+
+    private Trigger2(List<Trigger> subTriggers) {
+      super(subTriggers);
+    }
+
+    @Override
+    public void onElement(Trigger.OnElementContext c) { }
+
+    @Override
+    public void onMerge(Trigger.OnMergeContext c) { }
+
+    @Override
+    protected Trigger getContinuationTrigger(
+        List<Trigger> continuationTriggers) {
+      return null;
+    }
+
+    @Override
+    public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+      return null;
+    }
+
+    @Override
+    public boolean shouldFire(Trigger.TriggerContext context) throws Exception 
{
+      return false;
+    }
+
+    @Override
+    public void onFire(Trigger.TriggerContext context) throws Exception { }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
new file mode 100644
index 0000000..5fe17ad
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.ActiveWindowSet.MergeCallback;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import 
org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
+import org.apache.beam.sdk.util.state.TestInMemoryStateInternals;
+import org.apache.beam.sdk.util.state.TimerCallback;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Test utility that runs a {@link Trigger}, using in-memory stub 
implementation to provide
+ * the {@link StateInternals}.
+ *
+ * @param <W> The type of windows being used.
+ */
+public class TriggerTester<InputT, W extends BoundedWindow> {
+
+  /**
+   * A {@link TriggerTester} specialized to {@link Integer} values, so 
elements and timestamps
+   * can be conflated. Today, triggers should not observed the element type, 
so this is the
+   * only trigger tester that needs to be used.
+   */
+  public static class SimpleTriggerTester<W extends BoundedWindow>
+      extends TriggerTester<Integer, W> {
+
+    private SimpleTriggerTester(WindowingStrategy<Object, W> 
windowingStrategy) throws Exception {
+      super(windowingStrategy);
+    }
+
+    public void injectElements(int... values) throws Exception {
+      List<TimestampedValue<Integer>> timestampedValues =
+          Lists.newArrayListWithCapacity(values.length);
+      for (int value : values) {
+        timestampedValues.add(TimestampedValue.of(value, new Instant(value)));
+      }
+      injectElements(timestampedValues);
+    }
+
+    public SimpleTriggerTester<W> withAllowedLateness(Duration 
allowedLateness) throws Exception {
+      return new SimpleTriggerTester<>(
+          windowingStrategy.withAllowedLateness(allowedLateness));
+    }
+  }
+
+  protected final WindowingStrategy<Object, W> windowingStrategy;
+
+  private final TestInMemoryStateInternals<?> stateInternals =
+      new TestInMemoryStateInternals<Object>(null /* key */);
+  private final InMemoryTimerInternals timerInternals = new 
InMemoryTimerInternals();
+  private final TriggerContextFactory<W> contextFactory;
+  private final WindowFn<Object, W> windowFn;
+  private final ActiveWindowSet<W> activeWindows;
+  private final Map<W, W> windowToMergeResult;
+
+  /**
+   * An {@link ExecutableTrigger} built from the {@link Trigger} or {@link 
Trigger}
+   * under test.
+   */
+  private final ExecutableTrigger executableTrigger;
+
+  /**
+   * A map from a window and trigger to whether that trigger is finished for 
the window.
+   */
+  private final Map<W, FinishedTriggers> finishedSets;
+
+  public static <W extends BoundedWindow> SimpleTriggerTester<W> forTrigger(
+      Trigger trigger, WindowFn<Object, W> windowFn)
+          throws Exception {
+    WindowingStrategy<Object, W> windowingStrategy =
+        WindowingStrategy.of(windowFn).withTrigger(trigger)
+        // Merging requires accumulation mode or early firings can break up a 
session.
+        // Not currently an issue with the tester (because we never GC) but we 
don't want
+        // mystery failures due to violating this need.
+        .withMode(windowFn.isNonMerging()
+            ? AccumulationMode.DISCARDING_FIRED_PANES
+            : AccumulationMode.ACCUMULATING_FIRED_PANES);
+
+    return new SimpleTriggerTester<>(windowingStrategy);
+  }
+
+  public static <InputT, W extends BoundedWindow> TriggerTester<InputT, W> 
forAdvancedTrigger(
+      Trigger trigger, WindowFn<Object, W> windowFn) throws Exception {
+    WindowingStrategy<Object, W> strategy =
+        WindowingStrategy.of(windowFn).withTrigger(trigger)
+        // Merging requires accumulation mode or early firings can break up a 
session.
+        // Not currently an issue with the tester (because we never GC) but we 
don't want
+        // mystery failures due to violating this need.
+        .withMode(windowFn.isNonMerging()
+            ? AccumulationMode.DISCARDING_FIRED_PANES
+            : AccumulationMode.ACCUMULATING_FIRED_PANES);
+
+    return new TriggerTester<>(strategy);
+  }
+
+  protected TriggerTester(WindowingStrategy<Object, W> windowingStrategy) 
throws Exception {
+    this.windowingStrategy = windowingStrategy;
+    this.windowFn = windowingStrategy.getWindowFn();
+    this.executableTrigger = windowingStrategy.getTrigger();
+    this.finishedSets = new HashMap<>();
+
+    this.activeWindows =
+        windowFn.isNonMerging()
+            ? new NonMergingActiveWindowSet<W>()
+            : new MergingActiveWindowSet<W>(windowFn, stateInternals);
+    this.windowToMergeResult = new HashMap<>();
+
+    this.contextFactory =
+        new TriggerContextFactory<>(windowingStrategy.getWindowFn(), 
stateInternals, activeWindows);
+  }
+
+  /**
+   * Instructs the trigger to clear its state for the given window.
+   */
+  public void clearState(W window) throws Exception {
+    executableTrigger.invokeClear(contextFactory.base(window,
+        new TestTimers(windowNamespace(window)), executableTrigger, 
getFinishedSet(window)));
+  }
+
+  /**
+   * Asserts that the trigger has actually cleared all of its state for the 
given window. Since
+   * the trigger under test is the root, this makes the assert for all 
triggers regardless
+   * of their position in the trigger tree.
+   */
+  public void assertCleared(W window) {
+    for (StateNamespace untypedNamespace : 
stateInternals.getNamespacesInUse()) {
+      if (untypedNamespace instanceof WindowAndTriggerNamespace) {
+        @SuppressWarnings("unchecked")
+        WindowAndTriggerNamespace<W> namespace = 
(WindowAndTriggerNamespace<W>) untypedNamespace;
+        if (namespace.getWindow().equals(window)) {
+          Set<?> tagsInUse = stateInternals.getTagsInUse(namespace);
+          assertTrue("Trigger has not cleared tags: " + tagsInUse, 
tagsInUse.isEmpty());
+        }
+      }
+    }
+  }
+
+  /**
+   * Returns {@code true} if the {@link Trigger} under test is finished for 
the given window.
+   */
+  public boolean isMarkedFinished(W window) {
+    FinishedTriggers finishedSet = finishedSets.get(window);
+    if (finishedSet == null) {
+      return false;
+    }
+
+    return finishedSet.isFinished(executableTrigger);
+  }
+
+  private StateNamespace windowNamespace(W window) {
+    return StateNamespaces.window(windowFn.windowCoder(), 
checkNotNull(window));
+  }
+
+  /**
+   * Advance the input watermark to the specified time, then advance the 
output watermark as far as
+   * possible.
+   */
+  public void advanceInputWatermark(Instant newInputWatermark) throws 
Exception {
+    // TODO: Should test timer firings: see 
https://issues.apache.org/jira/browse/BEAM-694
+    timerInternals.advanceInputWatermark(TimerCallback.NO_OP, 
newInputWatermark);
+  }
+
+  /** Advance the processing time to the specified time. */
+  public void advanceProcessingTime(Instant newProcessingTime) throws 
Exception {
+    // TODO: Should test timer firings: see 
https://issues.apache.org/jira/browse/BEAM-694
+    timerInternals.advanceProcessingTime(TimerCallback.NO_OP, 
newProcessingTime);
+  }
+
+  /**
+   * Inject all the timestamped values (after passing through the window 
function) as if they
+   * arrived in a single chunk of a bundle (or work-unit).
+   */
+  @SafeVarargs
+  public final void injectElements(TimestampedValue<InputT>... values) throws 
Exception {
+    injectElements(Arrays.asList(values));
+  }
+
+  public final void injectElements(Collection<TimestampedValue<InputT>> 
values) throws Exception {
+    for (TimestampedValue<InputT> value : values) {
+      WindowTracing.trace("TriggerTester.injectElements: {}", value);
+    }
+
+    List<WindowedValue<InputT>> windowedValues = 
Lists.newArrayListWithCapacity(values.size());
+
+    for (TimestampedValue<InputT> input : values) {
+      try {
+        InputT value = input.getValue();
+        Instant timestamp = input.getTimestamp();
+        Collection<W> assignedWindows = windowFn.assignWindows(new 
TestAssignContext<W>(
+            windowFn, value, timestamp, GlobalWindow.INSTANCE));
+
+        for (W window : assignedWindows) {
+          activeWindows.addActiveForTesting(window);
+
+          // Today, triggers assume onTimer firing at the watermark time, 
whether or not they
+          // explicitly set the timer themselves. So this tester must set it.
+          timerInternals.setTimer(
+              TimerData.of(windowNamespace(window), window.maxTimestamp(), 
TimeDomain.EVENT_TIME));
+        }
+
+        windowedValues.add(WindowedValue.of(value, timestamp, assignedWindows, 
PaneInfo.NO_FIRING));
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    for (WindowedValue<InputT> windowedValue : windowedValues) {
+      for (BoundedWindow untypedWindow : windowedValue.getWindows()) {
+        // SDK is responsible for type safety
+        @SuppressWarnings("unchecked")
+        W window = mergeResult((W) untypedWindow);
+
+        Trigger.OnElementContext context = 
contextFactory.createOnElementContext(window,
+            new TestTimers(windowNamespace(window)), 
windowedValue.getTimestamp(),
+            executableTrigger, getFinishedSet(window));
+
+        if (!context.trigger().isFinished()) {
+          executableTrigger.invokeOnElement(context);
+        }
+      }
+    }
+  }
+
+  public boolean shouldFire(W window) throws Exception {
+    Trigger.TriggerContext context = contextFactory.base(
+        window,
+        new TestTimers(windowNamespace(window)),
+        executableTrigger, getFinishedSet(window));
+    executableTrigger.getSpec().prefetchShouldFire(context.state());
+    return executableTrigger.invokeShouldFire(context);
+  }
+
+  public void fireIfShouldFire(W window) throws Exception {
+    Trigger.TriggerContext context = contextFactory.base(
+        window,
+        new TestTimers(windowNamespace(window)),
+        executableTrigger, getFinishedSet(window));
+
+    executableTrigger.getSpec().prefetchShouldFire(context.state());
+    if (executableTrigger.invokeShouldFire(context)) {
+      executableTrigger.getSpec().prefetchOnFire(context.state());
+      executableTrigger.invokeOnFire(context);
+      if (context.trigger().isFinished()) {
+        activeWindows.remove(window);
+        executableTrigger.invokeClear(context);
+      }
+    }
+  }
+
+  public void setSubTriggerFinishedForWindow(int subTriggerIndex, W window, 
boolean value) {
+    
getFinishedSet(window).setFinished(executableTrigger.subTriggers().get(subTriggerIndex),
 value);
+  }
+
+  /**
+   * Invokes merge from the {@link WindowFn} a single time and passes the 
resulting merge
+   * events on to the trigger under test. Does not persist the fact that 
merging happened,
+   * since it is just to test the trigger's {@code OnMerge} method.
+   */
+  public final void mergeWindows() throws Exception {
+    windowToMergeResult.clear();
+    activeWindows.merge(new MergeCallback<W>() {
+      @Override
+      public void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) 
throws Exception {}
+
+      @Override
+      public void onMerge(Collection<W> toBeMerged, W mergeResult) throws 
Exception {
+        List<W> activeToBeMerged = new ArrayList<W>();
+        for (W window : toBeMerged) {
+          windowToMergeResult.put(window, mergeResult);
+          if (activeWindows.isActive(window)) {
+            activeToBeMerged.add(window);
+          }
+        }
+        Map<W, FinishedTriggers> mergingFinishedSets =
+            Maps.newHashMapWithExpectedSize(activeToBeMerged.size());
+        for (W oldWindow : activeToBeMerged) {
+          mergingFinishedSets.put(oldWindow, getFinishedSet(oldWindow));
+        }
+        
executableTrigger.invokeOnMerge(contextFactory.createOnMergeContext(mergeResult,
+            new TestTimers(windowNamespace(mergeResult)), executableTrigger,
+            getFinishedSet(mergeResult), mergingFinishedSets));
+        timerInternals.setTimer(TimerData.of(
+            windowNamespace(mergeResult), mergeResult.maxTimestamp(), 
TimeDomain.EVENT_TIME));
+      }
+    });
+  }
+
+  public  W mergeResult(W window) {
+    W result = windowToMergeResult.get(window);
+    return result == null ? window : result;
+  }
+
+  private FinishedTriggers getFinishedSet(W window) {
+    FinishedTriggers finishedSet = finishedSets.get(window);
+    if (finishedSet == null) {
+      finishedSet = FinishedTriggersSet.fromSet(new 
HashSet<ExecutableTrigger>());
+      finishedSets.put(window, finishedSet);
+    }
+    return finishedSet;
+  }
+
+  private static class TestAssignContext<W extends BoundedWindow>
+      extends WindowFn<Object, W>.AssignContext {
+    private Object element;
+    private Instant timestamp;
+    private BoundedWindow window;
+
+    public TestAssignContext(
+        WindowFn<Object, W> windowFn, Object element, Instant timestamp, 
BoundedWindow window) {
+      windowFn.super();
+      this.element = element;
+      this.timestamp = timestamp;
+      this.window = window;
+    }
+
+    @Override
+    public Object element() {
+      return element;
+    }
+
+    @Override
+    public Instant timestamp() {
+      return timestamp;
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return window;
+    }
+  }
+
+  private class TestTimers implements Timers {
+    private final StateNamespace namespace;
+
+    public TestTimers(StateNamespace namespace) {
+      checkArgument(namespace instanceof WindowNamespace);
+      this.namespace = namespace;
+    }
+
+    @Override
+    public void setTimer(Instant timestamp, TimeDomain timeDomain) {
+      timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain));
+    }
+
+    @Override
+    public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
+      timerInternals.deleteTimer(TimerData.of(namespace, timestamp, 
timeDomain));
+    }
+
+    @Override
+    public Instant currentProcessingTime() {
+      return timerInternals.currentProcessingTime();
+    }
+
+    @Override
+    @Nullable
+    public Instant currentSynchronizedProcessingTime() {
+      return timerInternals.currentSynchronizedProcessingTime();
+    }
+
+    @Override
+    public Instant currentEventTime() {
+      return timerInternals.currentInputWatermarkTime();
+    }
+  }
+}

Reply via email to