Repository: incubator-beam Updated Branches: refs/heads/master 73226168a -> e969f3d38
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java new file mode 100644 index 0000000..119c937 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; + +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; +import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +/** + * Tests the {@link AfterWatermarkStateMachine} triggers. + */ +@RunWith(JUnit4.class) +public class AfterWatermarkStateMachineTest { + + @Mock private OnceTriggerStateMachine mockEarly; + @Mock private OnceTriggerStateMachine mockLate; + + private SimpleTriggerStateMachineTester<IntervalWindow> tester; + private static TriggerStateMachine.TriggerContext anyTriggerContext() { + return Mockito.<TriggerStateMachine.TriggerContext>any(); + } + private static TriggerStateMachine.OnElementContext anyElementContext() { + return Mockito.<TriggerStateMachine.OnElementContext>any(); + } + + private void injectElements(int... elements) throws Exception { + for (int element : elements) { + doNothing().when(mockEarly).onElement(anyElementContext()); + doNothing().when(mockLate).onElement(anyElementContext()); + tester.injectElements(element); + } + } + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + public void testRunningAsTrigger(OnceTriggerStateMachine mockTrigger, IntervalWindow window) + throws Exception { + + // Don't fire due to mock saying no + when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false); + assertFalse(tester.shouldFire(window)); // not ready + + // Fire due to mock trigger; early trigger is required to be a OnceTrigger + when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + assertTrue(tester.shouldFire(window)); // ready + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + } + + @Test + public void testEarlyAndAtWatermark() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + AfterWatermarkStateMachine.pastEndOfWindow() + .withEarlyFirings(mockEarly), + FixedWindows.of(Duration.millis(100))); + + injectElements(1); + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); + + testRunningAsTrigger(mockEarly, window); + + // Fire due to watermark + when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false); + tester.advanceInputWatermark(new Instant(100)); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + @Test + public void testAtWatermarkAndLate() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + AfterWatermarkStateMachine.pastEndOfWindow() + .withLateFirings(mockLate), + FixedWindows.of(Duration.millis(100))); + + injectElements(1); + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); + + // No early firing, just double checking + when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(true); + assertFalse(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + // Fire due to watermark + when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false); + tester.advanceInputWatermark(new Instant(100)); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + testRunningAsTrigger(mockLate, window); + } + + @Test + public void testEarlyAndAtWatermarkAndLate() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + AfterWatermarkStateMachine.pastEndOfWindow() + .withEarlyFirings(mockEarly) + .withLateFirings(mockLate), + FixedWindows.of(Duration.millis(100))); + + injectElements(1); + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); + + testRunningAsTrigger(mockEarly, window); + + // Fire due to watermark + when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false); + tester.advanceInputWatermark(new Instant(100)); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + testRunningAsTrigger(mockLate, window); + } + + /** + * Tests that if the EOW is finished in both as well as the merged window, then + * it is finished in the merged result. + * + * <p>Because windows are discarded when a trigger finishes, we need to embed this + * in a sequence in order to check that it is re-activated. So this test is potentially + * sensitive to other triggers' correctness. + */ + @Test + public void testOnMergeAlreadyFinished() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + AfterEachStateMachine.inOrder( + AfterWatermarkStateMachine.pastEndOfWindow(), + RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(1))), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements(1); + tester.injectElements(5); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + + // Finish the AfterWatermark.pastEndOfWindow() trigger in both windows + tester.advanceInputWatermark(new Instant(15)); + assertTrue(tester.shouldFire(firstWindow)); + assertTrue(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + tester.fireIfShouldFire(secondWindow); + + // Confirm that we are on the second trigger by probing + assertFalse(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + tester.injectElements(1); + tester.injectElements(5); + assertTrue(tester.shouldFire(firstWindow)); + assertTrue(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + tester.fireIfShouldFire(secondWindow); + + // Merging should leave it finished + tester.mergeWindows(); + + // Confirm that we are on the second trigger by probing + assertFalse(tester.shouldFire(mergedWindow)); + tester.injectElements(1); + assertTrue(tester.shouldFire(mergedWindow)); + } + + /** + * Tests that the trigger rewinds to be non-finished in the merged window. + * + * <p>Because windows are discarded when a trigger finishes, we need to embed this + * in a sequence in order to check that it is re-activated. So this test is potentially + * sensitive to other triggers' correctness. + */ + @Test + public void testOnMergeRewinds() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + AfterEachStateMachine.inOrder( + AfterWatermarkStateMachine.pastEndOfWindow(), + RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(1))), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements(1); + tester.injectElements(5); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + + // Finish the AfterWatermark.pastEndOfWindow() trigger in only the first window + tester.advanceInputWatermark(new Instant(11)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + + // Confirm that we are on the second trigger by probing + assertFalse(tester.shouldFire(firstWindow)); + tester.injectElements(1); + assertTrue(tester.shouldFire(firstWindow)); + tester.fireIfShouldFire(firstWindow); + + // Merging should re-activate the watermark trigger in the merged window + tester.mergeWindows(); + + // Confirm that we are not on the second trigger by probing + assertFalse(tester.shouldFire(mergedWindow)); + tester.injectElements(1); + assertFalse(tester.shouldFire(mergedWindow)); + + // And confirm that advancing the watermark fires again + tester.advanceInputWatermark(new Instant(15)); + assertTrue(tester.shouldFire(mergedWindow)); + } + + /** + * Tests that if the EOW is finished in both as well as the merged window, then + * it is finished in the merged result. + * + * <p>Because windows are discarded when a trigger finishes, we need to embed this + * in a sequence in order to check that it is re-activated. So this test is potentially + * sensitive to other triggers' correctness. + */ + @Test + public void testEarlyAndLateOnMergeAlreadyFinished() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + AfterWatermarkStateMachine.pastEndOfWindow() + .withEarlyFirings(AfterPaneStateMachine.elementCountAtLeast(100)) + .withLateFirings(AfterPaneStateMachine.elementCountAtLeast(1)), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements(1); + tester.injectElements(5); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + + // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in both windows + tester.advanceInputWatermark(new Instant(15)); + assertTrue(tester.shouldFire(firstWindow)); + assertTrue(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + tester.fireIfShouldFire(secondWindow); + + // Confirm that we are on the late trigger by probing + assertFalse(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + tester.injectElements(1); + tester.injectElements(5); + assertTrue(tester.shouldFire(firstWindow)); + assertTrue(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + tester.fireIfShouldFire(secondWindow); + + // Merging should leave it on the late trigger + tester.mergeWindows(); + + // Confirm that we are on the late trigger by probing + assertFalse(tester.shouldFire(mergedWindow)); + tester.injectElements(1); + assertTrue(tester.shouldFire(mergedWindow)); + } + + /** + * Tests that the trigger rewinds to be non-finished in the merged window. + * + * <p>Because windows are discarded when a trigger finishes, we need to embed this + * in a sequence in order to check that it is re-activated. So this test is potentially + * sensitive to other triggers' correctness. + */ + @Test + public void testEarlyAndLateOnMergeRewinds() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + AfterWatermarkStateMachine.pastEndOfWindow() + .withEarlyFirings(AfterPaneStateMachine.elementCountAtLeast(100)) + .withLateFirings(AfterPaneStateMachine.elementCountAtLeast(1)), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements(1); + tester.injectElements(5); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + + // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in only the first window + tester.advanceInputWatermark(new Instant(11)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + + // Confirm that we are on the late trigger by probing + assertFalse(tester.shouldFire(firstWindow)); + tester.injectElements(1); + assertTrue(tester.shouldFire(firstWindow)); + tester.fireIfShouldFire(firstWindow); + + // Merging should re-activate the early trigger in the merged window + tester.mergeWindows(); + + // Confirm that we are not on the second trigger by probing + assertFalse(tester.shouldFire(mergedWindow)); + tester.injectElements(1); + assertFalse(tester.shouldFire(mergedWindow)); + + // And confirm that advancing the watermark fires again + tester.advanceInputWatermark(new Instant(15)); + assertTrue(tester.shouldFire(mergedWindow)); + } + + @Test + public void testFromEndOfWindowToString() { + TriggerStateMachine trigger = AfterWatermarkStateMachine.pastEndOfWindow(); + assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString()); + } + + @Test + public void testEarlyFiringsToString() { + TriggerStateMachine trigger = AfterWatermarkStateMachine.pastEndOfWindow() + .withEarlyFirings(StubTriggerStateMachine.named("t1")); + + assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1)", trigger.toString()); + } + + @Test + public void testLateFiringsToString() { + TriggerStateMachine trigger = AfterWatermarkStateMachine.pastEndOfWindow() + .withLateFirings(StubTriggerStateMachine.named("t1")); + + assertEquals("AfterWatermark.pastEndOfWindow().withLateFirings(t1)", trigger.toString()); + } + + @Test + public void testEarlyAndLateFiringsToString() { + TriggerStateMachine trigger = AfterWatermarkStateMachine.pastEndOfWindow() + .withEarlyFirings(StubTriggerStateMachine.named("t1")) + .withLateFirings(StubTriggerStateMachine.named("t2")); + + assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1).withLateFirings(t2)", + trigger.toString()); + } + + @Test + public void testToStringExcludesNeverTrigger() { + TriggerStateMachine trigger = AfterWatermarkStateMachine.pastEndOfWindow() + .withEarlyFirings(NeverStateMachine.ever()) + .withLateFirings(NeverStateMachine.ever()); + + assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/DefaultTriggerStateMachineTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/DefaultTriggerStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/DefaultTriggerStateMachineTest.java new file mode 100644 index 0000000..b11d319 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/DefaultTriggerStateMachineTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests the {@link DefaultTriggerStateMachine}, which should be equivalent to + * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}. + */ +@RunWith(JUnit4.class) +public class DefaultTriggerStateMachineTest { + + SimpleTriggerStateMachineTester<IntervalWindow> tester; + + @Test + public void testDefaultTriggerFixedWindows() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + DefaultTriggerStateMachine.of(), + FixedWindows.of(Duration.millis(100))); + + tester.injectElements( + 1, // [0, 100) + 101); // [100, 200) + + IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(100)); + IntervalWindow secondWindow = new IntervalWindow(new Instant(100), new Instant(200)); + + // Advance the watermark almost to the end of the first window. + tester.advanceInputWatermark(new Instant(99)); + assertFalse(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + + // Advance watermark past end of the first window, which is then ready + tester.advanceInputWatermark(new Instant(100)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + + // Fire, but the first window is still allowed to fire + tester.fireIfShouldFire(firstWindow); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + + // Advance watermark to 200, then both are ready + tester.advanceInputWatermark(new Instant(200)); + assertTrue(tester.shouldFire(firstWindow)); + assertTrue(tester.shouldFire(secondWindow)); + + assertFalse(tester.isMarkedFinished(firstWindow)); + assertFalse(tester.isMarkedFinished(secondWindow)); + } + + @Test + public void testDefaultTriggerSlidingWindows() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + DefaultTriggerStateMachine.of(), + SlidingWindows.of(Duration.millis(100)).every(Duration.millis(50))); + + tester.injectElements( + 1, // [-50, 50), [0, 100) + 50); // [0, 100), [50, 150) + + IntervalWindow firstWindow = new IntervalWindow(new Instant(-50), new Instant(50)); + IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new Instant(100)); + IntervalWindow thirdWindow = new IntervalWindow(new Instant(50), new Instant(150)); + + assertFalse(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + assertFalse(tester.shouldFire(thirdWindow)); + + // At 50, the first becomes ready; it stays ready after firing + tester.advanceInputWatermark(new Instant(50)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + assertFalse(tester.shouldFire(thirdWindow)); + tester.fireIfShouldFire(firstWindow); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + assertFalse(tester.shouldFire(thirdWindow)); + + // At 99, the first is still the only one ready + tester.advanceInputWatermark(new Instant(99)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + assertFalse(tester.shouldFire(thirdWindow)); + + // At 100, the first and second are ready + tester.advanceInputWatermark(new Instant(100)); + assertTrue(tester.shouldFire(firstWindow)); + assertTrue(tester.shouldFire(secondWindow)); + assertFalse(tester.shouldFire(thirdWindow)); + tester.fireIfShouldFire(firstWindow); + + assertFalse(tester.isMarkedFinished(firstWindow)); + assertFalse(tester.isMarkedFinished(secondWindow)); + assertFalse(tester.isMarkedFinished(thirdWindow)); + } + + @Test + public void testDefaultTriggerSessions() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + DefaultTriggerStateMachine.of(), + Sessions.withGapDuration(Duration.millis(100))); + + tester.injectElements( + 1, // [1, 101) + 50); // [50, 150) + tester.mergeWindows(); + + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(101)); + IntervalWindow secondWindow = new IntervalWindow(new Instant(50), new Instant(150)); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(150)); + + // Not ready in any window yet + tester.advanceInputWatermark(new Instant(100)); + assertFalse(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + assertFalse(tester.shouldFire(mergedWindow)); + + // The first window is "ready": the caller owns knowledge of which windows are merged away + tester.advanceInputWatermark(new Instant(149)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + assertFalse(tester.shouldFire(mergedWindow)); + + // Now ready on all windows + tester.advanceInputWatermark(new Instant(150)); + assertTrue(tester.shouldFire(firstWindow)); + assertTrue(tester.shouldFire(secondWindow)); + assertTrue(tester.shouldFire(mergedWindow)); + + // Ensure it repeats + tester.fireIfShouldFire(mergedWindow); + assertTrue(tester.shouldFire(mergedWindow)); + + assertFalse(tester.isMarkedFinished(mergedWindow)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachineTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachineTest.java new file mode 100644 index 0000000..744c220 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachineTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +import java.util.Arrays; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link ExecutableTriggerStateMachine}. + */ +@RunWith(JUnit4.class) +public class ExecutableTriggerStateMachineTest { + + @Test + public void testIndexAssignmentLeaf() throws Exception { + StubStateMachine t1 = new StubStateMachine(); + ExecutableTriggerStateMachine executable = ExecutableTriggerStateMachine.create(t1); + assertEquals(0, executable.getTriggerIndex()); + } + + @Test + public void testIndexAssignmentOneLevel() throws Exception { + StubStateMachine t1 = new StubStateMachine(); + StubStateMachine t2 = new StubStateMachine(); + StubStateMachine t = new StubStateMachine(t1, t2); + + ExecutableTriggerStateMachine executable = ExecutableTriggerStateMachine.create(t); + + assertEquals(0, executable.getTriggerIndex()); + assertEquals(1, executable.subTriggers().get(0).getTriggerIndex()); + assertSame(t1, executable.subTriggers().get(0).getSpec()); + assertEquals(2, executable.subTriggers().get(1).getTriggerIndex()); + assertSame(t2, executable.subTriggers().get(1).getSpec()); + } + + @Test + public void testIndexAssignmentTwoLevel() throws Exception { + StubStateMachine t11 = new StubStateMachine(); + StubStateMachine t12 = new StubStateMachine(); + StubStateMachine t13 = new StubStateMachine(); + StubStateMachine t14 = new StubStateMachine(); + StubStateMachine t21 = new StubStateMachine(); + StubStateMachine t22 = new StubStateMachine(); + StubStateMachine t1 = new StubStateMachine(t11, t12, t13, t14); + StubStateMachine t2 = new StubStateMachine(t21, t22); + StubStateMachine t = new StubStateMachine(t1, t2); + + ExecutableTriggerStateMachine executable = ExecutableTriggerStateMachine.create(t); + + assertEquals(0, executable.getTriggerIndex()); + assertEquals(1, executable.subTriggers().get(0).getTriggerIndex()); + assertEquals(6, executable.subTriggers().get(0).getFirstIndexAfterSubtree()); + assertEquals(6, executable.subTriggers().get(1).getTriggerIndex()); + + assertSame(t1, executable.getSubTriggerContaining(1).getSpec()); + assertSame(t2, executable.getSubTriggerContaining(6).getSpec()); + assertSame(t1, executable.getSubTriggerContaining(2).getSpec()); + assertSame(t1, executable.getSubTriggerContaining(3).getSpec()); + assertSame(t1, executable.getSubTriggerContaining(5).getSpec()); + assertSame(t2, executable.getSubTriggerContaining(7).getSpec()); + } + + private static class StubStateMachine extends TriggerStateMachine { + + @SafeVarargs + protected StubStateMachine(TriggerStateMachine... subTriggers) { + super(Arrays.asList(subTriggers)); + } + + @Override + public void onElement(OnElementContext c) throws Exception { } + + @Override + public void onMerge(OnMergeContext c) throws Exception { } + + @Override + public void clear(TriggerContext c) throws Exception { + } + + @Override + public boolean shouldFire(TriggerContext c) { + return false; + } + + @Override + public void onFire(TriggerContext c) { } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSetTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSetTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSetTest.java new file mode 100644 index 0000000..16bf49d --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSetTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.theInstance; +import static org.junit.Assert.assertThat; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link FinishedTriggersBitSet}. + */ +@RunWith(JUnit4.class) +public class FinishedTriggersBitSetTest { + /** + * Tests that after a trigger is set to finished, it reads back as finished. + */ + @Test + public void testSetGet() { + FinishedTriggersProperties.verifyGetAfterSet(FinishedTriggersBitSet.emptyWithCapacity(1)); + } + + /** + * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no + * others. + */ + @Test + public void testClearRecursively() { + FinishedTriggersProperties.verifyClearRecursively(FinishedTriggersBitSet.emptyWithCapacity(1)); + } + + @Test + public void testCopy() throws Exception { + FinishedTriggersBitSet finishedSet = FinishedTriggersBitSet.emptyWithCapacity(10); + assertThat(finishedSet.copy().getBitSet(), not(theInstance(finishedSet.getBitSet()))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersProperties.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersProperties.java new file mode 100644 index 0000000..31d17c1 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersProperties.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Generalized tests for {@link FinishedTriggers} implementations. + */ +public class FinishedTriggersProperties { + /** + * Tests that for the provided trigger and {@link FinishedTriggers}, when the trigger is set + * finished, it is correctly reported as finished. + */ + public static void verifyGetAfterSet( + FinishedTriggers finishedSet, ExecutableTriggerStateMachine trigger) { + assertFalse(finishedSet.isFinished(trigger)); + finishedSet.setFinished(trigger, true); + assertTrue(finishedSet.isFinished(trigger)); + } + + /** + * For a few arbitrary triggers, tests that when the trigger is set finished it is correctly + * reported as finished. + */ + public static void verifyGetAfterSet(FinishedTriggers finishedSet) { + ExecutableTriggerStateMachine trigger = + ExecutableTriggerStateMachine.create( + AfterAllStateMachine.of( + AfterFirstStateMachine.of( + AfterPaneStateMachine.elementCountAtLeast(3), + AfterWatermarkStateMachine.pastEndOfWindow()), + AfterAllStateMachine.of( + AfterPaneStateMachine.elementCountAtLeast(10), + AfterProcessingTimeStateMachine.pastFirstElementInPane()))); + + verifyGetAfterSet(finishedSet, trigger); + verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0).subTriggers().get(1)); + verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0)); + verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1)); + verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1).subTriggers().get(1)); + verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1).subTriggers().get(0)); + } + + /** + * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no + * others. + */ + public static void verifyClearRecursively(FinishedTriggers finishedSet) { + ExecutableTriggerStateMachine trigger = + ExecutableTriggerStateMachine.create( + AfterAllStateMachine.of( + AfterFirstStateMachine.of( + AfterPaneStateMachine.elementCountAtLeast(3), + AfterWatermarkStateMachine.pastEndOfWindow()), + AfterAllStateMachine.of( + AfterPaneStateMachine.elementCountAtLeast(10), + AfterProcessingTimeStateMachine.pastFirstElementInPane()))); + + // Set them all finished. This method is not on a trigger as it makes no sense outside tests. + setFinishedRecursively(finishedSet, trigger); + assertTrue(finishedSet.isFinished(trigger)); + assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0))); + assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(0))); + assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(1))); + + // Clear just the second AfterAll + finishedSet.clearRecursively(trigger.subTriggers().get(1)); + + // Check that the first and all that are still finished + assertTrue(finishedSet.isFinished(trigger)); + verifyFinishedRecursively(finishedSet, trigger.subTriggers().get(0)); + verifyUnfinishedRecursively(finishedSet, trigger.subTriggers().get(1)); + } + + private static void setFinishedRecursively( + FinishedTriggers finishedSet, ExecutableTriggerStateMachine trigger) { + finishedSet.setFinished(trigger, true); + for (ExecutableTriggerStateMachine subTrigger : trigger.subTriggers()) { + setFinishedRecursively(finishedSet, subTrigger); + } + } + + private static void verifyFinishedRecursively( + FinishedTriggers finishedSet, ExecutableTriggerStateMachine trigger) { + assertTrue(finishedSet.isFinished(trigger)); + for (ExecutableTriggerStateMachine subTrigger : trigger.subTriggers()) { + verifyFinishedRecursively(finishedSet, subTrigger); + } + } + + private static void verifyUnfinishedRecursively( + FinishedTriggers finishedSet, ExecutableTriggerStateMachine trigger) { + assertFalse(finishedSet.isFinished(trigger)); + for (ExecutableTriggerStateMachine subTrigger : trigger.subTriggers()) { + verifyUnfinishedRecursively(finishedSet, subTrigger); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersSetTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersSetTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersSetTest.java new file mode 100644 index 0000000..fae73f3 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/FinishedTriggersSetTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.theInstance; +import static org.junit.Assert.assertThat; + +import java.util.HashSet; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link FinishedTriggersSet}. + */ +@RunWith(JUnit4.class) +public class FinishedTriggersSetTest { + /** + * Tests that after a trigger is set to finished, it reads back as finished. + */ + @Test + public void testSetGet() { + FinishedTriggersProperties.verifyGetAfterSet( + FinishedTriggersSet.fromSet(new HashSet<ExecutableTriggerStateMachine>())); + } + + /** + * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no + * others. + */ + @Test + public void testClearRecursively() { + FinishedTriggersProperties.verifyClearRecursively( + FinishedTriggersSet.fromSet(new HashSet<ExecutableTriggerStateMachine>())); + } + + @Test + public void testCopy() throws Exception { + FinishedTriggersSet finishedSet = + FinishedTriggersSet.fromSet(new HashSet<ExecutableTriggerStateMachine>()); + assertThat(finishedSet.copy().getFinishedTriggers(), + not(theInstance(finishedSet.getFinishedTriggers()))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/NeverStateMachineTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/NeverStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/NeverStateMachineTest.java new file mode 100644 index 0000000..6d8a344 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/NeverStateMachineTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.values.TimestampedValue; + +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link NeverStateMachine}. + */ +@RunWith(JUnit4.class) +public class NeverStateMachineTest { + private SimpleTriggerStateMachineTester<IntervalWindow> triggerTester; + + @Before + public void setup() throws Exception { + triggerTester = + TriggerStateMachineTester.forTrigger( + NeverStateMachine.ever(), FixedWindows.of(Duration.standardMinutes(5))); + } + + @Test + public void falseAfterEndOfWindow() throws Exception { + triggerTester.injectElements(TimestampedValue.of(1, new Instant(1))); + IntervalWindow window = + new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardMinutes(5))); + assertThat(triggerTester.shouldFire(window), is(false)); + triggerTester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); + assertThat(triggerTester.shouldFire(window), is(false)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachineTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachineTest.java new file mode 100644 index 0000000..6e093c5 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachineTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link OrFinallyStateMachine}. + */ +@RunWith(JUnit4.class) +public class OrFinallyStateMachineTest { + + private SimpleTriggerStateMachineTester<IntervalWindow> tester; + + /** + * Tests that for {@code OrFinally(actual, ...)} when {@code actual} + * fires and finishes, the {@code OrFinally} also fires and finishes. + */ + @Test + public void testActualFiresAndFinishes() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + new OrFinallyStateMachine( + AfterPaneStateMachine.elementCountAtLeast(2), + AfterPaneStateMachine.elementCountAtLeast(100)), + FixedWindows.of(Duration.millis(100))); + + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); + + // Not yet firing + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + assertFalse(tester.isMarkedFinished(window)); + + // The actual fires and finishes + tester.injectElements(2); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + /** + * Tests that for {@code OrFinally(actual, ...)} when {@code actual} + * fires but does not finish, the {@code OrFinally} also fires and also does not + * finish. + */ + @Test + public void testActualFiresOnly() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + new OrFinallyStateMachine( + RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(2)), + AfterPaneStateMachine.elementCountAtLeast(100)), + FixedWindows.of(Duration.millis(100))); + + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); + + // Not yet firing + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + assertFalse(tester.isMarkedFinished(window)); + + // The actual fires but does not finish + tester.injectElements(2); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + // And again + tester.injectElements(3, 4); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + } + + /** + * Tests that if the first trigger rewinds to be non-finished in the merged window, + * then it becomes the currently active trigger again, with real triggers. + */ + @Test + public void testShouldFireAfterMerge() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + AfterEachStateMachine.inOrder( + AfterPaneStateMachine.elementCountAtLeast(5) + .orFinally(AfterWatermarkStateMachine.pastEndOfWindow()), + RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(1))), + Sessions.withGapDuration(Duration.millis(10))); + + // Finished the orFinally in the first window + tester.injectElements(1); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + assertFalse(tester.shouldFire(firstWindow)); + tester.advanceInputWatermark(new Instant(11)); + assertTrue(tester.shouldFire(firstWindow)); + tester.fireIfShouldFire(firstWindow); + + // Set up second window where it is not done + tester.injectElements(5); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + assertFalse(tester.shouldFire(secondWindow)); + + // Merge them, if the merged window were on the second trigger, it would be ready + tester.mergeWindows(); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + assertFalse(tester.shouldFire(mergedWindow)); + + // Now adding 3 more makes the main trigger ready to fire + tester.injectElements(1, 2, 3, 4, 5); + tester.mergeWindows(); + assertTrue(tester.shouldFire(mergedWindow)); + } + + /** + * Tests that for {@code OrFinally(actual, until)} when {@code actual} + * fires but does not finish, then {@code until} fires and finishes, the + * whole thing fires and finished. + */ + @Test + public void testActualFiresButUntilFinishes() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + new OrFinallyStateMachine( + RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(2)), + AfterPaneStateMachine.elementCountAtLeast(3)), + FixedWindows.of(Duration.millis(10))); + + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + + // Before any firing + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + assertFalse(tester.isMarkedFinished(window)); + + // The actual fires but doesn't finish + tester.injectElements(2); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + // The until fires and finishes; the trigger is finished + tester.injectElements(3); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + @Test + public void testToString() { + TriggerStateMachine trigger = + StubTriggerStateMachine.named("t1").orFinally(StubTriggerStateMachine.named("t2")); + assertEquals("t1.orFinally(t2)", trigger.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/RepeatedlyStateMachineTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/RepeatedlyStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/RepeatedlyStateMachineTest.java new file mode 100644 index 0000000..b52f41d --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/RepeatedlyStateMachineTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link RepeatedlyStateMachine}. + */ +@RunWith(JUnit4.class) +public class RepeatedlyStateMachineTest { + + @Mock private TriggerStateMachine mockTrigger; + private SimpleTriggerStateMachineTester<IntervalWindow> tester; + private static TriggerStateMachine.TriggerContext anyTriggerContext() { + return Mockito.<TriggerStateMachine.TriggerContext>any(); + } + + public void setUp(WindowFn<Object, IntervalWindow> windowFn) throws Exception { + MockitoAnnotations.initMocks(this); + tester = TriggerStateMachineTester + .forTrigger(RepeatedlyStateMachine.forever(mockTrigger), windowFn); + } + + /** + * Tests that onElement correctly passes the data on to the subtrigger. + */ + @Test + public void testOnElement() throws Exception { + setUp(FixedWindows.of(Duration.millis(10))); + tester.injectElements(37); + verify(mockTrigger).onElement(Mockito.<TriggerStateMachine.OnElementContext>any()); + } + + /** + * Tests that the repeatedly is ready to fire whenever the subtrigger is ready. + */ + @Test + public void testShouldFire() throws Exception { + setUp(FixedWindows.of(Duration.millis(10))); + + when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + assertTrue(tester.shouldFire(new IntervalWindow(new Instant(0), new Instant(10)))); + + when(mockTrigger.shouldFire(Mockito.<TriggerStateMachine.TriggerContext>any())) + .thenReturn(false); + assertFalse(tester.shouldFire(new IntervalWindow(new Instant(0), new Instant(10)))); + } + + @Test + public void testShouldFireAfterMerge() throws Exception { + tester = TriggerStateMachineTester.forTrigger( + RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(2)), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements(1); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + assertFalse(tester.shouldFire(firstWindow)); + + tester.injectElements(5); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + assertFalse(tester.shouldFire(secondWindow)); + + // Merge them, if the merged window were on the second trigger, it would be ready + tester.mergeWindows(); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + assertTrue(tester.shouldFire(mergedWindow)); + } + + @Test + public void testRepeatedlyAfterFirstElementCount() throws Exception { + SimpleTriggerStateMachineTester<GlobalWindow> tester = + TriggerStateMachineTester.forTrigger( + RepeatedlyStateMachine.forever( + AfterFirstStateMachine.of( + AfterProcessingTimeStateMachine.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(15)), + AfterPaneStateMachine.elementCountAtLeast(5))), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.injectElements(2, 3, 4, 5); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + + @Test + public void testRepeatedlyAfterFirstProcessingTime() throws Exception { + SimpleTriggerStateMachineTester<GlobalWindow> tester = + TriggerStateMachineTester.forTrigger( + RepeatedlyStateMachine.forever( + AfterFirstStateMachine.of( + AfterProcessingTimeStateMachine.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(15)), + AfterPaneStateMachine.elementCountAtLeast(5))), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15))); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + + @Test + public void testRepeatedlyElementCount() throws Exception { + SimpleTriggerStateMachineTester<GlobalWindow> tester = + TriggerStateMachineTester.forTrigger( + RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(5)), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.injectElements(2, 3, 4, 5); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + + @Test + public void testRepeatedlyProcessingTime() throws Exception { + SimpleTriggerStateMachineTester<GlobalWindow> tester = + TriggerStateMachineTester.forTrigger( + RepeatedlyStateMachine.forever( + AfterProcessingTimeStateMachine.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(15))), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15))); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + + + @Test + public void testToString() { + TriggerStateMachine trigger = RepeatedlyStateMachine.forever(new StubTriggerStateMachine() { + @Override + public String toString() { + return "innerTrigger"; + } + }); + + assertEquals("Repeatedly.forever(innerTrigger)", trigger.toString()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/ReshuffleTriggerStateMachineTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/ReshuffleTriggerStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/ReshuffleTriggerStateMachineTest.java new file mode 100644 index 0000000..ef74bb5 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/ReshuffleTriggerStateMachineTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link ReshuffleTriggerStateMachine}. + */ +@RunWith(JUnit4.class) +public class ReshuffleTriggerStateMachineTest { + + /** Public so that other tests can instantiate {@link ReshuffleTriggerStateMachine}. */ + public static <W extends BoundedWindow> ReshuffleTriggerStateMachine forTest() { + return new ReshuffleTriggerStateMachine(); + } + + @Test + public void testShouldFire() throws Exception { + TriggerStateMachineTester<Integer, IntervalWindow> tester = + TriggerStateMachineTester.forTrigger( + new ReshuffleTriggerStateMachine(), FixedWindows.of(Duration.millis(100))); + IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(300), new Instant(400)); + assertTrue(tester.shouldFire(arbitraryWindow)); + } + + @Test + public void testOnTimer() throws Exception { + TriggerStateMachineTester<Integer, IntervalWindow> tester = + TriggerStateMachineTester.forTrigger( + new ReshuffleTriggerStateMachine(), FixedWindows.of(Duration.millis(100))); + IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(100), new Instant(200)); + tester.fireIfShouldFire(arbitraryWindow); + assertFalse(tester.isMarkedFinished(arbitraryWindow)); + } + + @Test + public void testToString() { + TriggerStateMachine trigger = new ReshuffleTriggerStateMachine(); + assertEquals("ReshuffleTriggerStateMachine()", trigger.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java new file mode 100644 index 0000000..4512848 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import com.google.common.collect.Lists; +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; + +/** + * No-op {@link OnceTriggerStateMachine} implementation for testing. + */ +abstract class StubTriggerStateMachine extends OnceTriggerStateMachine { + /** + * Create a stub {@link TriggerStateMachine} instance which returns the specified name on {@link + * #toString()}. + */ + static StubTriggerStateMachine named(final String name) { + return new StubTriggerStateMachine() { + @Override + public String toString() { + return name; + } + }; + } + + protected StubTriggerStateMachine() { + super(Lists.<TriggerStateMachine>newArrayList()); + } + + @Override + protected void onOnlyFiring(TriggerContext context) throws Exception { + } + + @Override + public void onElement(OnElementContext c) throws Exception { + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + } + + @Override + public boolean shouldFire(TriggerContext context) throws Exception { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTest.java new file mode 100644 index 0000000..e06eb85 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.List; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link TriggerStateMachine}. + */ +@RunWith(JUnit4.class) +public class TriggerStateMachineTest { + + @Test + public void testTriggerToString() throws Exception { + assertEquals( + "AfterWatermark.pastEndOfWindow()", + AfterWatermarkStateMachine.pastEndOfWindow().toString()); + assertEquals( + "Repeatedly.forever(AfterWatermark.pastEndOfWindow())", + RepeatedlyStateMachine.forever(AfterWatermarkStateMachine.pastEndOfWindow()).toString()); + } + + @Test + public void testIsCompatible() throws Exception { + assertTrue(new Trigger1(null).isCompatible(new Trigger1(null))); + assertTrue(new Trigger1(Arrays.<TriggerStateMachine>asList(new Trigger2(null))) + .isCompatible(new Trigger1(Arrays.<TriggerStateMachine>asList(new Trigger2(null))))); + + assertFalse(new Trigger1(null).isCompatible(new Trigger2(null))); + assertFalse(new Trigger1(Arrays.<TriggerStateMachine>asList(new Trigger1(null))) + .isCompatible(new Trigger1(Arrays.<TriggerStateMachine>asList(new Trigger2(null))))); + } + + private static class Trigger1 extends TriggerStateMachine { + + private Trigger1(List<TriggerStateMachine> subTriggers) { + super(subTriggers); + } + + @Override + public void onElement(TriggerStateMachine.OnElementContext c) { } + + @Override + public void onMerge(TriggerStateMachine.OnMergeContext c) { } + + @Override + public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception { + return false; + } + + @Override + public void onFire(TriggerStateMachine.TriggerContext context) throws Exception { } + } + + private static class Trigger2 extends TriggerStateMachine { + + private Trigger2(List<TriggerStateMachine> subTriggers) { + super(subTriggers); + } + + @Override + public void onElement(TriggerStateMachine.OnElementContext c) { } + + @Override + public void onMerge(TriggerStateMachine.OnMergeContext c) { } + + @Override + public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception { + return false; + } + + @Override + public void onFire(TriggerStateMachine.TriggerContext context) throws Exception { } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java new file mode 100644 index 0000000..1ccca17 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.ActiveWindowSet; +import org.apache.beam.sdk.util.ActiveWindowSet.MergeCallback; +import org.apache.beam.sdk.util.MergingActiveWindowSet; +import org.apache.beam.sdk.util.NonMergingActiveWindowSet; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.Timers; +import org.apache.beam.sdk.util.WindowTracing; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; +import org.apache.beam.sdk.util.state.InMemoryTimerInternals; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace; +import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace; +import org.apache.beam.sdk.util.state.TestInMemoryStateInternals; +import org.apache.beam.sdk.util.state.TimerCallback; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * Test utility that runs a {@link TriggerStateMachine}, using in-memory stub implementation to + * provide the {@link StateInternals}. + * + * @param <W> The type of windows being used. + */ +public class TriggerStateMachineTester<InputT, W extends BoundedWindow> { + + /** + * A {@link TriggerStateMachineTester} specialized to {@link Integer} values, so elements and + * timestamps can be conflated. Today, triggers should not observed the element type, so this is + * the only trigger tester that needs to be used. + */ + public static class SimpleTriggerStateMachineTester<W extends BoundedWindow> + extends TriggerStateMachineTester<Integer, W> { + + private SimpleTriggerStateMachineTester( + ExecutableTriggerStateMachine executableTriggerStateMachine, + WindowFn<Object, W> windowFn, + Duration allowedLateness) + throws Exception { + super(executableTriggerStateMachine, windowFn, allowedLateness); + } + + public void injectElements(int... values) throws Exception { + List<TimestampedValue<Integer>> timestampedValues = + Lists.newArrayListWithCapacity(values.length); + for (int value : values) { + timestampedValues.add(TimestampedValue.of(value, new Instant(value))); + } + injectElements(timestampedValues); + } + + public SimpleTriggerStateMachineTester<W> withAllowedLateness(Duration allowedLateness) + throws Exception { + return new SimpleTriggerStateMachineTester<>( + executableTrigger, + windowFn, + allowedLateness); + } + } + + private final TestInMemoryStateInternals<?> stateInternals = + new TestInMemoryStateInternals<Object>(null /* key */); + private final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals(); + private final TriggerStateMachineContextFactory<W> contextFactory; + protected final WindowFn<Object, W> windowFn; + private final ActiveWindowSet<W> activeWindows; + private final Map<W, W> windowToMergeResult; + + /** + * An {@link ExecutableTriggerStateMachine} under test. + */ + protected final ExecutableTriggerStateMachine executableTrigger; + + /** + * A map from a window and trigger to whether that trigger is finished for the window. + */ + private final Map<W, FinishedTriggers> finishedSets; + + public static <W extends BoundedWindow> SimpleTriggerStateMachineTester<W> forTrigger( + TriggerStateMachine stateMachine, WindowFn<Object, W> windowFn) + throws Exception { + + ExecutableTriggerStateMachine executableTriggerStateMachine = + ExecutableTriggerStateMachine.create(stateMachine); + + // Merging requires accumulation mode or early firings can break up a session. + // Not currently an issue with the tester (because we never GC) but we don't want + // mystery failures due to violating this need. + AccumulationMode mode = + windowFn.isNonMerging() + ? AccumulationMode.DISCARDING_FIRED_PANES + : AccumulationMode.ACCUMULATING_FIRED_PANES; + + return new SimpleTriggerStateMachineTester<>( + executableTriggerStateMachine, windowFn, Duration.ZERO); + } + + public static <InputT, W extends BoundedWindow> + TriggerStateMachineTester<InputT, W> forAdvancedTrigger( + TriggerStateMachine stateMachine, WindowFn<Object, W> windowFn) throws Exception { + ExecutableTriggerStateMachine executableTriggerStateMachine = + ExecutableTriggerStateMachine.create(stateMachine); + + // Merging requires accumulation mode or early firings can break up a session. + // Not currently an issue with the tester (because we never GC) but we don't want + // mystery failures due to violating this need. + AccumulationMode mode = + windowFn.isNonMerging() + ? AccumulationMode.DISCARDING_FIRED_PANES + : AccumulationMode.ACCUMULATING_FIRED_PANES; + + return new TriggerStateMachineTester<>(executableTriggerStateMachine, windowFn, Duration.ZERO); + } + + protected TriggerStateMachineTester( + ExecutableTriggerStateMachine executableTriggerStateMachine, + WindowFn<Object, W> windowFn, + Duration allowedLateness) throws Exception { + this.windowFn = windowFn; + this.executableTrigger = executableTriggerStateMachine; + this.finishedSets = new HashMap<>(); + + this.activeWindows = + windowFn.isNonMerging() + ? new NonMergingActiveWindowSet<W>() + : new MergingActiveWindowSet<W>(windowFn, stateInternals); + this.windowToMergeResult = new HashMap<>(); + + this.contextFactory = + new TriggerStateMachineContextFactory<>( + windowFn, stateInternals, activeWindows); + } + + /** + * Instructs the trigger to clear its state for the given window. + */ + public void clearState(W window) throws Exception { + executableTrigger.invokeClear(contextFactory.base(window, + new TestTimers(windowNamespace(window)), executableTrigger, getFinishedSet(window))); + } + + /** + * Asserts that the trigger has actually cleared all of its state for the given window. Since + * the trigger under test is the root, this makes the assert for all triggers regardless + * of their position in the trigger tree. + */ + public void assertCleared(W window) { + for (StateNamespace untypedNamespace : stateInternals.getNamespacesInUse()) { + if (untypedNamespace instanceof WindowAndTriggerNamespace) { + @SuppressWarnings("unchecked") + WindowAndTriggerNamespace<W> namespace = (WindowAndTriggerNamespace<W>) untypedNamespace; + if (namespace.getWindow().equals(window)) { + Set<?> tagsInUse = stateInternals.getTagsInUse(namespace); + assertTrue("Trigger has not cleared tags: " + tagsInUse, tagsInUse.isEmpty()); + } + } + } + } + + /** + * Returns {@code true} if the {@link TriggerStateMachine} under test is finished for the given + * window. + */ + public boolean isMarkedFinished(W window) { + FinishedTriggers finishedSet = finishedSets.get(window); + if (finishedSet == null) { + return false; + } + + return finishedSet.isFinished(executableTrigger); + } + + private StateNamespace windowNamespace(W window) { + return StateNamespaces.window(windowFn.windowCoder(), checkNotNull(window)); + } + + /** + * Advance the input watermark to the specified time, then advance the output watermark as far as + * possible. + */ + public void advanceInputWatermark(Instant newInputWatermark) throws Exception { + // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694 + timerInternals.advanceInputWatermark(TimerCallback.NO_OP, newInputWatermark); + } + + /** Advance the processing time to the specified time. */ + public void advanceProcessingTime(Instant newProcessingTime) throws Exception { + // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694 + timerInternals.advanceProcessingTime(TimerCallback.NO_OP, newProcessingTime); + } + + /** + * Inject all the timestamped values (after passing through the window function) as if they + * arrived in a single chunk of a bundle (or work-unit). + */ + @SafeVarargs + public final void injectElements(TimestampedValue<InputT>... values) throws Exception { + injectElements(Arrays.asList(values)); + } + + public final void injectElements(Collection<TimestampedValue<InputT>> values) throws Exception { + for (TimestampedValue<InputT> value : values) { + WindowTracing.trace("TriggerTester.injectElements: {}", value); + } + + List<WindowedValue<InputT>> windowedValues = Lists.newArrayListWithCapacity(values.size()); + + for (TimestampedValue<InputT> input : values) { + try { + InputT value = input.getValue(); + Instant timestamp = input.getTimestamp(); + Collection<W> assignedWindows = windowFn.assignWindows(new TestAssignContext<W>( + windowFn, value, timestamp, GlobalWindow.INSTANCE)); + + for (W window : assignedWindows) { + activeWindows.addActiveForTesting(window); + + // Today, triggers assume onTimer firing at the watermark time, whether or not they + // explicitly set the timer themselves. So this tester must set it. + timerInternals.setTimer( + TimerData.of(windowNamespace(window), window.maxTimestamp(), TimeDomain.EVENT_TIME)); + } + + windowedValues.add(WindowedValue.of(value, timestamp, assignedWindows, PaneInfo.NO_FIRING)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + for (WindowedValue<InputT> windowedValue : windowedValues) { + for (BoundedWindow untypedWindow : windowedValue.getWindows()) { + // SDK is responsible for type safety + @SuppressWarnings("unchecked") + W window = mergeResult((W) untypedWindow); + + TriggerStateMachine.OnElementContext context = contextFactory.createOnElementContext(window, + new TestTimers(windowNamespace(window)), windowedValue.getTimestamp(), + executableTrigger, getFinishedSet(window)); + + if (!context.trigger().isFinished()) { + executableTrigger.invokeOnElement(context); + } + } + } + } + + public boolean shouldFire(W window) throws Exception { + TriggerStateMachine.TriggerContext context = contextFactory.base( + window, + new TestTimers(windowNamespace(window)), + executableTrigger, getFinishedSet(window)); + executableTrigger.getSpec().prefetchShouldFire(context.state()); + return executableTrigger.invokeShouldFire(context); + } + + public void fireIfShouldFire(W window) throws Exception { + TriggerStateMachine.TriggerContext context = contextFactory.base( + window, + new TestTimers(windowNamespace(window)), + executableTrigger, getFinishedSet(window)); + + executableTrigger.getSpec().prefetchShouldFire(context.state()); + if (executableTrigger.invokeShouldFire(context)) { + executableTrigger.getSpec().prefetchOnFire(context.state()); + executableTrigger.invokeOnFire(context); + if (context.trigger().isFinished()) { + activeWindows.remove(window); + executableTrigger.invokeClear(context); + } + } + } + + public void setSubTriggerFinishedForWindow(int subTriggerIndex, W window, boolean value) { + getFinishedSet(window).setFinished(executableTrigger.subTriggers().get(subTriggerIndex), value); + } + + /** + * Invokes merge from the {@link WindowFn} a single time and passes the resulting merge + * events on to the trigger under test. Does not persist the fact that merging happened, + * since it is just to test the trigger's {@code OnMerge} method. + */ + public final void mergeWindows() throws Exception { + windowToMergeResult.clear(); + activeWindows.merge(new MergeCallback<W>() { + @Override + public void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {} + + @Override + public void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception { + List<W> activeToBeMerged = new ArrayList<W>(); + for (W window : toBeMerged) { + windowToMergeResult.put(window, mergeResult); + if (activeWindows.isActive(window)) { + activeToBeMerged.add(window); + } + } + Map<W, FinishedTriggers> mergingFinishedSets = + Maps.newHashMapWithExpectedSize(activeToBeMerged.size()); + for (W oldWindow : activeToBeMerged) { + mergingFinishedSets.put(oldWindow, getFinishedSet(oldWindow)); + } + executableTrigger.invokeOnMerge(contextFactory.createOnMergeContext(mergeResult, + new TestTimers(windowNamespace(mergeResult)), executableTrigger, + getFinishedSet(mergeResult), mergingFinishedSets)); + timerInternals.setTimer(TimerData.of( + windowNamespace(mergeResult), mergeResult.maxTimestamp(), TimeDomain.EVENT_TIME)); + } + }); + } + + public W mergeResult(W window) { + W result = windowToMergeResult.get(window); + return result == null ? window : result; + } + + private FinishedTriggers getFinishedSet(W window) { + FinishedTriggers finishedSet = finishedSets.get(window); + if (finishedSet == null) { + finishedSet = FinishedTriggersSet.fromSet(new HashSet<ExecutableTriggerStateMachine>()); + finishedSets.put(window, finishedSet); + } + return finishedSet; + } + + private static class TestAssignContext<W extends BoundedWindow> + extends WindowFn<Object, W>.AssignContext { + private Object element; + private Instant timestamp; + private BoundedWindow window; + + public TestAssignContext( + WindowFn<Object, W> windowFn, Object element, Instant timestamp, BoundedWindow window) { + windowFn.super(); + this.element = element; + this.timestamp = timestamp; + this.window = window; + } + + @Override + public Object element() { + return element; + } + + @Override + public Instant timestamp() { + return timestamp; + } + + @Override + public BoundedWindow window() { + return window; + } + } + + private class TestTimers implements Timers { + private final StateNamespace namespace; + + public TestTimers(StateNamespace namespace) { + checkArgument(namespace instanceof WindowNamespace); + this.namespace = namespace; + } + + @Override + public void setTimer(Instant timestamp, TimeDomain timeDomain) { + timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain)); + } + + @Override + public void deleteTimer(Instant timestamp, TimeDomain timeDomain) { + timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain)); + } + + @Override + public Instant currentProcessingTime() { + return timerInternals.currentProcessingTime(); + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return timerInternals.currentSynchronizedProcessingTime(); + } + + @Override + public Instant currentEventTime() { + return timerInternals.currentInputWatermarkTime(); + } + } +}