http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java new file mode 100644 index 0000000..38d030e --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link AfterPane}. + */ +@RunWith(JUnit4.class) +public class AfterPaneTest { + + SimpleTriggerTester<IntervalWindow> tester; + /** + * Tests that the trigger does fire when enough elements are in a window, and that it only + * fires that window (no leakage). + */ + @Test + public void testAfterPaneElementCountFixedWindows() throws Exception { + tester = TriggerTester.forTrigger( + AfterPane.elementCountAtLeast(2), + FixedWindows.of(Duration.millis(10))); + + tester.injectElements(1); // [0, 10) + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + assertFalse(tester.shouldFire(window)); + + tester.injectElements(2); // [0, 10) + tester.injectElements(11); // [10, 20) + + assertTrue(tester.shouldFire(window)); // ready to fire + tester.fireIfShouldFire(window); // and finished + assertTrue(tester.isMarkedFinished(window)); + + // But don't finish the other window + assertFalse(tester.isMarkedFinished(new IntervalWindow(new Instant(10), new Instant(20)))); + } + + @Test + public void testClear() throws Exception { + SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger( + AfterPane.elementCountAtLeast(2), + FixedWindows.of(Duration.millis(10))); + + tester.injectElements(1, 2, 3); + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + tester.clearState(window); + tester.assertCleared(window); + } + + @Test + public void testAfterPaneElementCountSessions() throws Exception { + tester = TriggerTester.forTrigger( + AfterPane.elementCountAtLeast(2), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements( + 1, // in [1, 11) + 2); // in [2, 12) + + assertFalse(tester.shouldFire(new IntervalWindow(new Instant(1), new Instant(11)))); + assertFalse(tester.shouldFire(new IntervalWindow(new Instant(2), new Instant(12)))); + + tester.mergeWindows(); + + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(12)); + assertTrue(tester.shouldFire(mergedWindow)); + tester.fireIfShouldFire(mergedWindow); + assertTrue(tester.isMarkedFinished(mergedWindow)); + + // Because we closed the previous window, we don't have it around to merge with. So there + // will be a new FIRE_AND_FINISH result. + tester.injectElements( + 7, // in [7, 17) + 9); // in [9, 19) + + tester.mergeWindows(); + + IntervalWindow newMergedWindow = new IntervalWindow(new Instant(7), new Instant(19)); + assertTrue(tester.shouldFire(newMergedWindow)); + tester.fireIfShouldFire(newMergedWindow); + assertTrue(tester.isMarkedFinished(newMergedWindow)); + } + + @Test + public void testFireDeadline() throws Exception { + assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, + AfterPane.elementCountAtLeast(1).getWatermarkThatGuaranteesFiring( + new IntervalWindow(new Instant(0), new Instant(10)))); + } + + @Test + public void testContinuation() throws Exception { + assertEquals( + AfterPane.elementCountAtLeast(1), + AfterPane.elementCountAtLeast(100).getContinuationTrigger()); + assertEquals( + AfterPane.elementCountAtLeast(1), + AfterPane.elementCountAtLeast(100).getContinuationTrigger().getContinuationTrigger()); + } + + @Test + public void testToString() { + Trigger trigger = AfterPane.elementCountAtLeast(5); + assertEquals("AfterPane.elementCountAtLeast(5)", trigger.toString()); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java new file mode 100644 index 0000000..13a7acf --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests the {@link AfterProcessingTime}. + */ +@RunWith(JUnit4.class) +public class AfterProcessingTimeTest { + + /** + * Tests the basic property that the trigger does wait for processing time to be + * far enough advanced. + */ + @Test + public void testAfterProcessingTimeFixedWindows() throws Exception { + Duration windowDuration = Duration.millis(10); + SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger( + AfterProcessingTime + .pastFirstElementInPane() + .plusDelayOf(Duration.millis(5)), + FixedWindows.of(windowDuration)); + + tester.advanceProcessingTime(new Instant(10)); + + // Timer at 15 + tester.injectElements(1); + IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10)); + tester.advanceProcessingTime(new Instant(12)); + assertFalse(tester.shouldFire(firstWindow)); + + // Load up elements in the next window, timer at 17 for them + tester.injectElements(11, 12, 13); + IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20)); + assertFalse(tester.shouldFire(secondWindow)); + + // Not quite time to fire + tester.advanceProcessingTime(new Instant(14)); + assertFalse(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + + // Timer at 19 for these in the first window; it should be ignored since the 15 will fire first + tester.injectElements(2, 3); + + // Advance past the first timer and fire, finishing the first window + tester.advanceProcessingTime(new Instant(16)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + assertTrue(tester.isMarkedFinished(firstWindow)); + + // The next window fires and finishes now + tester.advanceProcessingTime(new Instant(18)); + assertTrue(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(secondWindow); + assertTrue(tester.isMarkedFinished(secondWindow)); + } + + /** + * Tests that when windows merge, if the trigger is waiting for "N millis after the first + * element" that it is relative to the earlier of the two merged windows. + */ + @Test + public void testClear() throws Exception { + SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger( + AfterProcessingTime + .pastFirstElementInPane() + .plusDelayOf(Duration.millis(5)), + FixedWindows.of(Duration.millis(10))); + + tester.injectElements(1, 2, 3); + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + tester.clearState(window); + tester.assertCleared(window); + } + + @Test + public void testAfterProcessingTimeWithMergingWindow() throws Exception { + SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger( + AfterProcessingTime + .pastFirstElementInPane() + .plusDelayOf(Duration.millis(5)), + Sessions.withGapDuration(Duration.millis(10))); + + tester.advanceProcessingTime(new Instant(10)); + tester.injectElements(1); // in [1, 11), timer for 15 + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + assertFalse(tester.shouldFire(firstWindow)); + + tester.advanceProcessingTime(new Instant(12)); + tester.injectElements(3); // in [3, 13), timer for 17 + IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new Instant(13)); + assertFalse(tester.shouldFire(secondWindow)); + + tester.mergeWindows(); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(13)); + + tester.advanceProcessingTime(new Instant(16)); + assertTrue(tester.shouldFire(mergedWindow)); + } + + @Test + public void testFireDeadline() throws Exception { + assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, + AfterProcessingTime.pastFirstElementInPane().getWatermarkThatGuaranteesFiring( + new IntervalWindow(new Instant(0), new Instant(10)))); + } + + @Test + public void testContinuation() throws Exception { + OnceTrigger firstElementPlus1 = + AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)); + assertEquals( + new AfterSynchronizedProcessingTime(), + firstElementPlus1.getContinuationTrigger()); + } + + /** + * Basic test of compatibility check between identical triggers. + */ + @Test + public void testCompatibilityIdentical() throws Exception { + Trigger t1 = AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(1L)); + Trigger t2 = AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(1L)); + assertTrue(t1.isCompatible(t2)); + } + + @Test + public void testToString() { + Trigger trigger = AfterProcessingTime.pastFirstElementInPane(); + assertEquals("AfterProcessingTime.pastFirstElementInPane()", trigger.toString()); + } + + @Test + public void testWithDelayToString() { + Trigger trigger = AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(5)); + + assertEquals("AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5 minutes)", + trigger.toString()); + } + + @Test + public void testBuiltUpToString() { + Trigger trigger = AfterWatermark.pastEndOfWindow() + .withLateFirings(AfterProcessingTime + .pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(10))); + + String expected = "AfterWatermark.pastEndOfWindow()" + + ".withLateFirings(AfterProcessingTime" + + ".pastFirstElementInPane()" + + ".plusDelayOf(10 minutes))"; + + assertEquals(expected, trigger.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java new file mode 100644 index 0000000..7e6e938 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests the {@link AfterSynchronizedProcessingTime}. + */ +@RunWith(JUnit4.class) +public class AfterSynchronizedProcessingTimeTest { + + private Trigger underTest = new AfterSynchronizedProcessingTime(); + + @Test + public void testAfterProcessingTimeWithFixedWindows() throws Exception { + Duration windowDuration = Duration.millis(10); + SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger( + AfterProcessingTime + .pastFirstElementInPane() + .plusDelayOf(Duration.millis(5)), + FixedWindows.of(windowDuration)); + + tester.advanceProcessingTime(new Instant(10)); + + // Timer at 15 + tester.injectElements(1); + IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10)); + tester.advanceProcessingTime(new Instant(12)); + assertFalse(tester.shouldFire(firstWindow)); + + // Load up elements in the next window, timer at 17 for them + tester.injectElements(11, 12, 13); + IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20)); + assertFalse(tester.shouldFire(secondWindow)); + + // Not quite time to fire + tester.advanceProcessingTime(new Instant(14)); + assertFalse(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + + // Timer at 19 for these in the first window; it should be ignored since the 15 will fire first + tester.injectElements(2, 3); + + // Advance past the first timer and fire, finishing the first window + tester.advanceProcessingTime(new Instant(16)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + assertTrue(tester.isMarkedFinished(firstWindow)); + + // The next window fires and finishes now + tester.advanceProcessingTime(new Instant(18)); + assertTrue(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(secondWindow); + assertTrue(tester.isMarkedFinished(secondWindow)); + } + + @Test + public void testAfterProcessingTimeWithMergingWindow() throws Exception { + Duration windowDuration = Duration.millis(10); + SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger( + AfterProcessingTime + .pastFirstElementInPane() + .plusDelayOf(Duration.millis(5)), + Sessions.withGapDuration(windowDuration)); + + tester.advanceProcessingTime(new Instant(10)); + tester.injectElements(1); // in [1, 11), timer for 15 + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + assertFalse(tester.shouldFire(firstWindow)); + + tester.advanceProcessingTime(new Instant(12)); + tester.injectElements(3); // in [3, 13), timer for 17 + IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new Instant(13)); + assertFalse(tester.shouldFire(secondWindow)); + + tester.mergeWindows(); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(13)); + + tester.advanceProcessingTime(new Instant(16)); + assertTrue(tester.shouldFire(mergedWindow)); + } + + @Test + public void testFireDeadline() throws Exception { + assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, + underTest.getWatermarkThatGuaranteesFiring( + new IntervalWindow(new Instant(0), new Instant(10)))); + } + + @Test + public void testContinuation() throws Exception { + assertEquals(underTest, underTest.getContinuationTrigger()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java new file mode 100644 index 0000000..084027b --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; + +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +/** + * Tests the {@link AfterWatermark} triggers. + */ +@RunWith(JUnit4.class) +public class AfterWatermarkTest { + + @Mock private OnceTrigger mockEarly; + @Mock private OnceTrigger mockLate; + + private SimpleTriggerTester<IntervalWindow> tester; + private static Trigger.TriggerContext anyTriggerContext() { + return Mockito.<Trigger.TriggerContext>any(); + } + private static Trigger.OnElementContext anyElementContext() { + return Mockito.<Trigger.OnElementContext>any(); + } + + private void injectElements(int... elements) throws Exception { + for (int element : elements) { + doNothing().when(mockEarly).onElement(anyElementContext()); + doNothing().when(mockLate).onElement(anyElementContext()); + tester.injectElements(element); + } + } + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + public void testRunningAsTrigger(OnceTrigger mockTrigger, IntervalWindow window) + throws Exception { + + // Don't fire due to mock saying no + when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false); + assertFalse(tester.shouldFire(window)); // not ready + + // Fire due to mock trigger; early trigger is required to be a OnceTrigger + when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + assertTrue(tester.shouldFire(window)); // ready + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + } + + @Test + public void testEarlyAndAtWatermark() throws Exception { + tester = TriggerTester.forTrigger( + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(mockEarly), + FixedWindows.of(Duration.millis(100))); + + injectElements(1); + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); + + testRunningAsTrigger(mockEarly, window); + + // Fire due to watermark + when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false); + tester.advanceInputWatermark(new Instant(100)); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + @Test + public void testAtWatermarkAndLate() throws Exception { + tester = TriggerTester.forTrigger( + AfterWatermark.pastEndOfWindow() + .withLateFirings(mockLate), + FixedWindows.of(Duration.millis(100))); + + injectElements(1); + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); + + // No early firing, just double checking + when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(true); + assertFalse(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + // Fire due to watermark + when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false); + tester.advanceInputWatermark(new Instant(100)); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + testRunningAsTrigger(mockLate, window); + } + + @Test + public void testEarlyAndAtWatermarkAndLate() throws Exception { + tester = TriggerTester.forTrigger( + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(mockEarly) + .withLateFirings(mockLate), + FixedWindows.of(Duration.millis(100))); + + injectElements(1); + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); + + testRunningAsTrigger(mockEarly, window); + + // Fire due to watermark + when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false); + tester.advanceInputWatermark(new Instant(100)); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + testRunningAsTrigger(mockLate, window); + } + + /** + * Tests that if the EOW is finished in both as well as the merged window, then + * it is finished in the merged result. + * + * <p>Because windows are discarded when a trigger finishes, we need to embed this + * in a sequence in order to check that it is re-activated. So this test is potentially + * sensitive to other triggers' correctness. + */ + @Test + public void testOnMergeAlreadyFinished() throws Exception { + tester = TriggerTester.forTrigger( + AfterEach.inOrder( + AfterWatermark.pastEndOfWindow(), + Repeatedly.forever(AfterPane.elementCountAtLeast(1))), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements(1); + tester.injectElements(5); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + + // Finish the AfterWatermark.pastEndOfWindow() trigger in both windows + tester.advanceInputWatermark(new Instant(15)); + assertTrue(tester.shouldFire(firstWindow)); + assertTrue(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + tester.fireIfShouldFire(secondWindow); + + // Confirm that we are on the second trigger by probing + assertFalse(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + tester.injectElements(1); + tester.injectElements(5); + assertTrue(tester.shouldFire(firstWindow)); + assertTrue(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + tester.fireIfShouldFire(secondWindow); + + // Merging should leave it finished + tester.mergeWindows(); + + // Confirm that we are on the second trigger by probing + assertFalse(tester.shouldFire(mergedWindow)); + tester.injectElements(1); + assertTrue(tester.shouldFire(mergedWindow)); + } + + /** + * Tests that the trigger rewinds to be non-finished in the merged window. + * + * <p>Because windows are discarded when a trigger finishes, we need to embed this + * in a sequence in order to check that it is re-activated. So this test is potentially + * sensitive to other triggers' correctness. + */ + @Test + public void testOnMergeRewinds() throws Exception { + tester = TriggerTester.forTrigger( + AfterEach.inOrder( + AfterWatermark.pastEndOfWindow(), + Repeatedly.forever(AfterPane.elementCountAtLeast(1))), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements(1); + tester.injectElements(5); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + + // Finish the AfterWatermark.pastEndOfWindow() trigger in only the first window + tester.advanceInputWatermark(new Instant(11)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + + // Confirm that we are on the second trigger by probing + assertFalse(tester.shouldFire(firstWindow)); + tester.injectElements(1); + assertTrue(tester.shouldFire(firstWindow)); + tester.fireIfShouldFire(firstWindow); + + // Merging should re-activate the watermark trigger in the merged window + tester.mergeWindows(); + + // Confirm that we are not on the second trigger by probing + assertFalse(tester.shouldFire(mergedWindow)); + tester.injectElements(1); + assertFalse(tester.shouldFire(mergedWindow)); + + // And confirm that advancing the watermark fires again + tester.advanceInputWatermark(new Instant(15)); + assertTrue(tester.shouldFire(mergedWindow)); + } + + /** + * Tests that if the EOW is finished in both as well as the merged window, then + * it is finished in the merged result. + * + * <p>Because windows are discarded when a trigger finishes, we need to embed this + * in a sequence in order to check that it is re-activated. So this test is potentially + * sensitive to other triggers' correctness. + */ + @Test + public void testEarlyAndLateOnMergeAlreadyFinished() throws Exception { + tester = TriggerTester.forTrigger( + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(AfterPane.elementCountAtLeast(100)) + .withLateFirings(AfterPane.elementCountAtLeast(1)), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements(1); + tester.injectElements(5); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + + // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in both windows + tester.advanceInputWatermark(new Instant(15)); + assertTrue(tester.shouldFire(firstWindow)); + assertTrue(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + tester.fireIfShouldFire(secondWindow); + + // Confirm that we are on the late trigger by probing + assertFalse(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + tester.injectElements(1); + tester.injectElements(5); + assertTrue(tester.shouldFire(firstWindow)); + assertTrue(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + tester.fireIfShouldFire(secondWindow); + + // Merging should leave it on the late trigger + tester.mergeWindows(); + + // Confirm that we are on the late trigger by probing + assertFalse(tester.shouldFire(mergedWindow)); + tester.injectElements(1); + assertTrue(tester.shouldFire(mergedWindow)); + } + + /** + * Tests that the trigger rewinds to be non-finished in the merged window. + * + * <p>Because windows are discarded when a trigger finishes, we need to embed this + * in a sequence in order to check that it is re-activated. So this test is potentially + * sensitive to other triggers' correctness. + */ + @Test + public void testEarlyAndLateOnMergeRewinds() throws Exception { + tester = TriggerTester.forTrigger( + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(AfterPane.elementCountAtLeast(100)) + .withLateFirings(AfterPane.elementCountAtLeast(1)), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements(1); + tester.injectElements(5); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + + // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in only the first window + tester.advanceInputWatermark(new Instant(11)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + + // Confirm that we are on the late trigger by probing + assertFalse(tester.shouldFire(firstWindow)); + tester.injectElements(1); + assertTrue(tester.shouldFire(firstWindow)); + tester.fireIfShouldFire(firstWindow); + + // Merging should re-activate the early trigger in the merged window + tester.mergeWindows(); + + // Confirm that we are not on the second trigger by probing + assertFalse(tester.shouldFire(mergedWindow)); + tester.injectElements(1); + assertFalse(tester.shouldFire(mergedWindow)); + + // And confirm that advancing the watermark fires again + tester.advanceInputWatermark(new Instant(15)); + assertTrue(tester.shouldFire(mergedWindow)); + } + + @Test + public void testFromEndOfWindowToString() { + Trigger trigger = AfterWatermark.pastEndOfWindow(); + assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString()); + } + + @Test + public void testEarlyFiringsToString() { + Trigger trigger = AfterWatermark.pastEndOfWindow().withEarlyFirings(StubTrigger.named("t1")); + + assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1)", trigger.toString()); + } + + @Test + public void testLateFiringsToString() { + Trigger trigger = AfterWatermark.pastEndOfWindow().withLateFirings(StubTrigger.named("t1")); + + assertEquals("AfterWatermark.pastEndOfWindow().withLateFirings(t1)", trigger.toString()); + } + + @Test + public void testEarlyAndLateFiringsToString() { + Trigger trigger = + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(StubTrigger.named("t1")) + .withLateFirings(StubTrigger.named("t2")); + + assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1).withLateFirings(t2)", + trigger.toString()); + } + + @Test + public void testToStringExcludesNeverTrigger() { + Trigger trigger = + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(Never.ever()) + .withLateFirings(Never.ever()); + + assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java new file mode 100644 index 0000000..673e555 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests the {@link DefaultTrigger}, which should be equivalent to + * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}. + */ +@RunWith(JUnit4.class) +public class DefaultTriggerTest { + + SimpleTriggerTester<IntervalWindow> tester; + + @Test + public void testDefaultTriggerFixedWindows() throws Exception { + tester = TriggerTester.forTrigger( + DefaultTrigger.of(), + FixedWindows.of(Duration.millis(100))); + + tester.injectElements( + 1, // [0, 100) + 101); // [100, 200) + + IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(100)); + IntervalWindow secondWindow = new IntervalWindow(new Instant(100), new Instant(200)); + + // Advance the watermark almost to the end of the first window. + tester.advanceInputWatermark(new Instant(99)); + assertFalse(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + + // Advance watermark past end of the first window, which is then ready + tester.advanceInputWatermark(new Instant(100)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + + // Fire, but the first window is still allowed to fire + tester.fireIfShouldFire(firstWindow); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + + // Advance watermark to 200, then both are ready + tester.advanceInputWatermark(new Instant(200)); + assertTrue(tester.shouldFire(firstWindow)); + assertTrue(tester.shouldFire(secondWindow)); + + assertFalse(tester.isMarkedFinished(firstWindow)); + assertFalse(tester.isMarkedFinished(secondWindow)); + } + + @Test + public void testDefaultTriggerSlidingWindows() throws Exception { + tester = TriggerTester.forTrigger( + DefaultTrigger.of(), + SlidingWindows.of(Duration.millis(100)).every(Duration.millis(50))); + + tester.injectElements( + 1, // [-50, 50), [0, 100) + 50); // [0, 100), [50, 150) + + IntervalWindow firstWindow = new IntervalWindow(new Instant(-50), new Instant(50)); + IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new Instant(100)); + IntervalWindow thirdWindow = new IntervalWindow(new Instant(50), new Instant(150)); + + assertFalse(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + assertFalse(tester.shouldFire(thirdWindow)); + + // At 50, the first becomes ready; it stays ready after firing + tester.advanceInputWatermark(new Instant(50)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + assertFalse(tester.shouldFire(thirdWindow)); + tester.fireIfShouldFire(firstWindow); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + assertFalse(tester.shouldFire(thirdWindow)); + + // At 99, the first is still the only one ready + tester.advanceInputWatermark(new Instant(99)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + assertFalse(tester.shouldFire(thirdWindow)); + + // At 100, the first and second are ready + tester.advanceInputWatermark(new Instant(100)); + assertTrue(tester.shouldFire(firstWindow)); + assertTrue(tester.shouldFire(secondWindow)); + assertFalse(tester.shouldFire(thirdWindow)); + tester.fireIfShouldFire(firstWindow); + + assertFalse(tester.isMarkedFinished(firstWindow)); + assertFalse(tester.isMarkedFinished(secondWindow)); + assertFalse(tester.isMarkedFinished(thirdWindow)); + } + + @Test + public void testDefaultTriggerSessions() throws Exception { + tester = TriggerTester.forTrigger( + DefaultTrigger.of(), + Sessions.withGapDuration(Duration.millis(100))); + + tester.injectElements( + 1, // [1, 101) + 50); // [50, 150) + tester.mergeWindows(); + + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(101)); + IntervalWindow secondWindow = new IntervalWindow(new Instant(50), new Instant(150)); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(150)); + + // Not ready in any window yet + tester.advanceInputWatermark(new Instant(100)); + assertFalse(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + assertFalse(tester.shouldFire(mergedWindow)); + + // The first window is "ready": the caller owns knowledge of which windows are merged away + tester.advanceInputWatermark(new Instant(149)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + assertFalse(tester.shouldFire(mergedWindow)); + + // Now ready on all windows + tester.advanceInputWatermark(new Instant(150)); + assertTrue(tester.shouldFire(firstWindow)); + assertTrue(tester.shouldFire(secondWindow)); + assertTrue(tester.shouldFire(mergedWindow)); + + // Ensure it repeats + tester.fireIfShouldFire(mergedWindow); + assertTrue(tester.shouldFire(mergedWindow)); + + assertFalse(tester.isMarkedFinished(mergedWindow)); + } + + @Test + public void testFireDeadline() throws Exception { + assertEquals(new Instant(9), DefaultTrigger.of().getWatermarkThatGuaranteesFiring( + new IntervalWindow(new Instant(0), new Instant(10)))); + assertEquals(GlobalWindow.INSTANCE.maxTimestamp(), + DefaultTrigger.of().getWatermarkThatGuaranteesFiring(GlobalWindow.INSTANCE)); + } + + @Test + public void testContinuation() throws Exception { + assertEquals(DefaultTrigger.of(), DefaultTrigger.of().getContinuationTrigger()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java new file mode 100644 index 0000000..1e3a1ff --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link ExecutableTrigger}. + */ +@RunWith(JUnit4.class) +public class ExecutableTriggerTest { + + @Test + public void testIndexAssignmentLeaf() throws Exception { + StubTrigger t1 = new StubTrigger(); + ExecutableTrigger executable = ExecutableTrigger.create(t1); + assertEquals(0, executable.getTriggerIndex()); + } + + @Test + public void testIndexAssignmentOneLevel() throws Exception { + StubTrigger t1 = new StubTrigger(); + StubTrigger t2 = new StubTrigger(); + StubTrigger t = new StubTrigger(t1, t2); + + ExecutableTrigger executable = ExecutableTrigger.create(t); + + assertEquals(0, executable.getTriggerIndex()); + assertEquals(1, executable.subTriggers().get(0).getTriggerIndex()); + assertSame(t1, executable.subTriggers().get(0).getSpec()); + assertEquals(2, executable.subTriggers().get(1).getTriggerIndex()); + assertSame(t2, executable.subTriggers().get(1).getSpec()); + } + + @Test + public void testIndexAssignmentTwoLevel() throws Exception { + StubTrigger t11 = new StubTrigger(); + StubTrigger t12 = new StubTrigger(); + StubTrigger t13 = new StubTrigger(); + StubTrigger t14 = new StubTrigger(); + StubTrigger t21 = new StubTrigger(); + StubTrigger t22 = new StubTrigger(); + StubTrigger t1 = new StubTrigger(t11, t12, t13, t14); + StubTrigger t2 = new StubTrigger(t21, t22); + StubTrigger t = new StubTrigger(t1, t2); + + ExecutableTrigger executable = ExecutableTrigger.create(t); + + assertEquals(0, executable.getTriggerIndex()); + assertEquals(1, executable.subTriggers().get(0).getTriggerIndex()); + assertEquals(6, executable.subTriggers().get(0).getFirstIndexAfterSubtree()); + assertEquals(6, executable.subTriggers().get(1).getTriggerIndex()); + + assertSame(t1, executable.getSubTriggerContaining(1).getSpec()); + assertSame(t2, executable.getSubTriggerContaining(6).getSpec()); + assertSame(t1, executable.getSubTriggerContaining(2).getSpec()); + assertSame(t1, executable.getSubTriggerContaining(3).getSpec()); + assertSame(t1, executable.getSubTriggerContaining(5).getSpec()); + assertSame(t2, executable.getSubTriggerContaining(7).getSpec()); + } + + private static class StubTrigger extends Trigger { + + @SafeVarargs + protected StubTrigger(Trigger... subTriggers) { + super(Arrays.asList(subTriggers)); + } + + @Override + public void onElement(OnElementContext c) throws Exception { } + + @Override + public void onMerge(OnMergeContext c) throws Exception { } + + @Override + public void clear(TriggerContext c) throws Exception { + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + return BoundedWindow.TIMESTAMP_MAX_VALUE; + } + + @Override + public boolean isCompatible(Trigger other) { + return false; + } + + @Override + public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { + return this; + } + + @Override + public boolean shouldFire(TriggerContext c) { + return false; + } + + @Override + public void onFire(TriggerContext c) { } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java new file mode 100644 index 0000000..7f74620 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.theInstance; +import static org.junit.Assert.assertThat; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link FinishedTriggersBitSet}. + */ +@RunWith(JUnit4.class) +public class FinishedTriggersBitSetTest { + /** + * Tests that after a trigger is set to finished, it reads back as finished. + */ + @Test + public void testSetGet() { + FinishedTriggersProperties.verifyGetAfterSet(FinishedTriggersBitSet.emptyWithCapacity(1)); + } + + /** + * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no + * others. + */ + @Test + public void testClearRecursively() { + FinishedTriggersProperties.verifyClearRecursively(FinishedTriggersBitSet.emptyWithCapacity(1)); + } + + @Test + public void testCopy() throws Exception { + FinishedTriggersBitSet finishedSet = FinishedTriggersBitSet.emptyWithCapacity(10); + assertThat(finishedSet.copy().getBitSet(), not(theInstance(finishedSet.getBitSet()))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java new file mode 100644 index 0000000..a66f74f --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.transforms.windowing.AfterAll; +import org.apache.beam.sdk.transforms.windowing.AfterFirst; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; + +/** + * Generalized tests for {@link FinishedTriggers} implementations. + */ +public class FinishedTriggersProperties { + /** + * Tests that for the provided trigger and {@link FinishedTriggers}, when the trigger is set + * finished, it is correctly reported as finished. + */ + public static void verifyGetAfterSet(FinishedTriggers finishedSet, ExecutableTrigger trigger) { + assertFalse(finishedSet.isFinished(trigger)); + finishedSet.setFinished(trigger, true); + assertTrue(finishedSet.isFinished(trigger)); + } + + /** + * For a few arbitrary triggers, tests that when the trigger is set finished it is correctly + * reported as finished. + */ + public static void verifyGetAfterSet(FinishedTriggers finishedSet) { + ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of( + AfterFirst.of(AfterPane.elementCountAtLeast(3), AfterWatermark.pastEndOfWindow()), + AfterAll.of( + AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane()))); + + verifyGetAfterSet(finishedSet, trigger); + verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0).subTriggers().get(1)); + verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0)); + verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1)); + verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1).subTriggers().get(1)); + verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1).subTriggers().get(0)); + } + + /** + * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no + * others. + */ + public static void verifyClearRecursively(FinishedTriggers finishedSet) { + ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of( + AfterFirst.of(AfterPane.elementCountAtLeast(3), AfterWatermark.pastEndOfWindow()), + AfterAll.of( + AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane()))); + + // Set them all finished. This method is not on a trigger as it makes no sense outside tests. + setFinishedRecursively(finishedSet, trigger); + assertTrue(finishedSet.isFinished(trigger)); + assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0))); + assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(0))); + assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(1))); + + // Clear just the second AfterAll + finishedSet.clearRecursively(trigger.subTriggers().get(1)); + + // Check that the first and all that are still finished + assertTrue(finishedSet.isFinished(trigger)); + verifyFinishedRecursively(finishedSet, trigger.subTriggers().get(0)); + verifyUnfinishedRecursively(finishedSet, trigger.subTriggers().get(1)); + } + + private static void setFinishedRecursively( + FinishedTriggers finishedSet, ExecutableTrigger trigger) { + finishedSet.setFinished(trigger, true); + for (ExecutableTrigger subTrigger : trigger.subTriggers()) { + setFinishedRecursively(finishedSet, subTrigger); + } + } + + private static void verifyFinishedRecursively( + FinishedTriggers finishedSet, ExecutableTrigger trigger) { + assertTrue(finishedSet.isFinished(trigger)); + for (ExecutableTrigger subTrigger : trigger.subTriggers()) { + verifyFinishedRecursively(finishedSet, subTrigger); + } + } + + private static void verifyUnfinishedRecursively( + FinishedTriggers finishedSet, ExecutableTrigger trigger) { + assertFalse(finishedSet.isFinished(trigger)); + for (ExecutableTrigger subTrigger : trigger.subTriggers()) { + verifyUnfinishedRecursively(finishedSet, subTrigger); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java new file mode 100644 index 0000000..072d264 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.theInstance; +import static org.junit.Assert.assertThat; + +import java.util.HashSet; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link FinishedTriggersSet}. + */ +@RunWith(JUnit4.class) +public class FinishedTriggersSetTest { + /** + * Tests that after a trigger is set to finished, it reads back as finished. + */ + @Test + public void testSetGet() { + FinishedTriggersProperties.verifyGetAfterSet( + FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>())); + } + + /** + * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no + * others. + */ + @Test + public void testClearRecursively() { + FinishedTriggersProperties.verifyClearRecursively( + FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>())); + } + + @Test + public void testCopy() throws Exception { + FinishedTriggersSet finishedSet = + FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()); + assertThat(finishedSet.copy().getFinishedTriggers(), + not(theInstance(finishedSet.getFinishedTriggers()))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java new file mode 100644 index 0000000..fb2b4d5 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link Never}. + */ +@RunWith(JUnit4.class) +public class NeverTest { + private SimpleTriggerTester<IntervalWindow> triggerTester; + + @Before + public void setup() throws Exception { + triggerTester = + TriggerTester.forTrigger( + Never.ever(), FixedWindows.of(Duration.standardMinutes(5))); + } + + @Test + public void falseAfterEndOfWindow() throws Exception { + triggerTester.injectElements(TimestampedValue.of(1, new Instant(1))); + IntervalWindow window = + new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardMinutes(5))); + assertThat(triggerTester.shouldFire(window), is(false)); + triggerTester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); + assertThat(triggerTester.shouldFire(window), is(false)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java new file mode 100644 index 0000000..7289d97 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link OrFinallyTrigger}. + */ +@RunWith(JUnit4.class) +public class OrFinallyTriggerTest { + + private SimpleTriggerTester<IntervalWindow> tester; + + /** + * Tests that for {@code OrFinally(actual, ...)} when {@code actual} + * fires and finishes, the {@code OrFinally} also fires and finishes. + */ + @Test + public void testActualFiresAndFinishes() throws Exception { + tester = TriggerTester.forTrigger( + new OrFinallyTrigger( + AfterPane.elementCountAtLeast(2), + AfterPane.elementCountAtLeast(100)), + FixedWindows.of(Duration.millis(100))); + + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); + + // Not yet firing + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + assertFalse(tester.isMarkedFinished(window)); + + // The actual fires and finishes + tester.injectElements(2); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + /** + * Tests that for {@code OrFinally(actual, ...)} when {@code actual} + * fires but does not finish, the {@code OrFinally} also fires and also does not + * finish. + */ + @Test + public void testActualFiresOnly() throws Exception { + tester = TriggerTester.forTrigger( + new OrFinallyTrigger( + Repeatedly.forever(AfterPane.elementCountAtLeast(2)), + AfterPane.elementCountAtLeast(100)), + FixedWindows.of(Duration.millis(100))); + + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); + + // Not yet firing + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + assertFalse(tester.isMarkedFinished(window)); + + // The actual fires but does not finish + tester.injectElements(2); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + // And again + tester.injectElements(3, 4); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + } + + /** + * Tests that if the first trigger rewinds to be non-finished in the merged window, + * then it becomes the currently active trigger again, with real triggers. + */ + @Test + public void testShouldFireAfterMerge() throws Exception { + tester = TriggerTester.forTrigger( + AfterEach.inOrder( + AfterPane.elementCountAtLeast(5) + .orFinally(AfterWatermark.pastEndOfWindow()), + Repeatedly.forever(AfterPane.elementCountAtLeast(1))), + Sessions.withGapDuration(Duration.millis(10))); + + // Finished the orFinally in the first window + tester.injectElements(1); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + assertFalse(tester.shouldFire(firstWindow)); + tester.advanceInputWatermark(new Instant(11)); + assertTrue(tester.shouldFire(firstWindow)); + tester.fireIfShouldFire(firstWindow); + + // Set up second window where it is not done + tester.injectElements(5); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + assertFalse(tester.shouldFire(secondWindow)); + + // Merge them, if the merged window were on the second trigger, it would be ready + tester.mergeWindows(); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + assertFalse(tester.shouldFire(mergedWindow)); + + // Now adding 3 more makes the main trigger ready to fire + tester.injectElements(1, 2, 3, 4, 5); + tester.mergeWindows(); + assertTrue(tester.shouldFire(mergedWindow)); + } + + /** + * Tests that for {@code OrFinally(actual, until)} when {@code actual} + * fires but does not finish, then {@code until} fires and finishes, the + * whole thing fires and finished. + */ + @Test + public void testActualFiresButUntilFinishes() throws Exception { + tester = TriggerTester.forTrigger( + new OrFinallyTrigger( + Repeatedly.forever(AfterPane.elementCountAtLeast(2)), + AfterPane.elementCountAtLeast(3)), + FixedWindows.of(Duration.millis(10))); + + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + + // Before any firing + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + assertFalse(tester.isMarkedFinished(window)); + + // The actual fires but doesn't finish + tester.injectElements(2); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + // The until fires and finishes; the trigger is finished + tester.injectElements(3); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + @Test + public void testFireDeadline() throws Exception { + BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + + assertEquals(new Instant(9), + Repeatedly.forever(AfterWatermark.pastEndOfWindow()) + .getWatermarkThatGuaranteesFiring(window)); + assertEquals(new Instant(9), Repeatedly.forever(AfterWatermark.pastEndOfWindow()) + .orFinally(AfterPane.elementCountAtLeast(1)) + .getWatermarkThatGuaranteesFiring(window)); + assertEquals(new Instant(9), Repeatedly.forever(AfterPane.elementCountAtLeast(1)) + .orFinally(AfterWatermark.pastEndOfWindow()) + .getWatermarkThatGuaranteesFiring(window)); + assertEquals(new Instant(9), + AfterPane.elementCountAtLeast(100) + .orFinally(AfterWatermark.pastEndOfWindow()) + .getWatermarkThatGuaranteesFiring(window)); + + assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, + Repeatedly.forever(AfterPane.elementCountAtLeast(1)) + .orFinally(AfterPane.elementCountAtLeast(10)) + .getWatermarkThatGuaranteesFiring(window)); + } + + @Test + public void testContinuation() throws Exception { + OnceTrigger triggerA = AfterProcessingTime.pastFirstElementInPane(); + OnceTrigger triggerB = AfterWatermark.pastEndOfWindow(); + Trigger aOrFinallyB = triggerA.orFinally(triggerB); + Trigger bOrFinallyA = triggerB.orFinally(triggerA); + assertEquals( + Repeatedly.forever( + triggerA.getContinuationTrigger().orFinally(triggerB.getContinuationTrigger())), + aOrFinallyB.getContinuationTrigger()); + assertEquals( + Repeatedly.forever( + triggerB.getContinuationTrigger().orFinally(triggerA.getContinuationTrigger())), + bOrFinallyA.getContinuationTrigger()); + } + + @Test + public void testToString() { + Trigger trigger = StubTrigger.named("t1").orFinally(StubTrigger.named("t2")); + assertEquals("t1.orFinally(t2)", trigger.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java new file mode 100644 index 0000000..6e8930d --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link Repeatedly}. + */ +@RunWith(JUnit4.class) +public class RepeatedlyTest { + + @Mock private Trigger mockTrigger; + private SimpleTriggerTester<IntervalWindow> tester; + private static Trigger.TriggerContext anyTriggerContext() { + return Mockito.<Trigger.TriggerContext>any(); + } + + public void setUp(WindowFn<Object, IntervalWindow> windowFn) throws Exception { + MockitoAnnotations.initMocks(this); + tester = TriggerTester.forTrigger(Repeatedly.forever(mockTrigger), windowFn); + } + + /** + * Tests that onElement correctly passes the data on to the subtrigger. + */ + @Test + public void testOnElement() throws Exception { + setUp(FixedWindows.of(Duration.millis(10))); + tester.injectElements(37); + verify(mockTrigger).onElement(Mockito.<Trigger.OnElementContext>any()); + } + + /** + * Tests that the repeatedly is ready to fire whenever the subtrigger is ready. + */ + @Test + public void testShouldFire() throws Exception { + setUp(FixedWindows.of(Duration.millis(10))); + + when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + assertTrue(tester.shouldFire(new IntervalWindow(new Instant(0), new Instant(10)))); + + when(mockTrigger.shouldFire(Mockito.<Trigger.TriggerContext>any())) + .thenReturn(false); + assertFalse(tester.shouldFire(new IntervalWindow(new Instant(0), new Instant(10)))); + } + + /** + * Tests that the watermark that guarantees firing is that of the subtrigger. + */ + @Test + public void testFireDeadline() throws Exception { + setUp(FixedWindows.of(Duration.millis(10))); + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + Instant arbitraryInstant = new Instant(34957849); + + when(mockTrigger.getWatermarkThatGuaranteesFiring(Mockito.<IntervalWindow>any())) + .thenReturn(arbitraryInstant); + + assertThat( + Repeatedly.forever(mockTrigger).getWatermarkThatGuaranteesFiring(window), + equalTo(arbitraryInstant)); + } + + @Test + public void testContinuation() throws Exception { + Trigger trigger = AfterProcessingTime.pastFirstElementInPane(); + Trigger repeatedly = Repeatedly.forever(trigger); + assertEquals( + Repeatedly.forever(trigger.getContinuationTrigger()), repeatedly.getContinuationTrigger()); + assertEquals( + Repeatedly.forever(trigger.getContinuationTrigger().getContinuationTrigger()), + repeatedly.getContinuationTrigger().getContinuationTrigger()); + } + + @Test + public void testShouldFireAfterMerge() throws Exception { + tester = TriggerTester.forTrigger( + Repeatedly.forever(AfterPane.elementCountAtLeast(2)), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements(1); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + assertFalse(tester.shouldFire(firstWindow)); + + tester.injectElements(5); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + assertFalse(tester.shouldFire(secondWindow)); + + // Merge them, if the merged window were on the second trigger, it would be ready + tester.mergeWindows(); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + assertTrue(tester.shouldFire(mergedWindow)); + } + + @Test + public void testRepeatedlyAfterFirstElementCount() throws Exception { + SimpleTriggerTester<GlobalWindow> tester = + TriggerTester.forTrigger( + Repeatedly.forever( + AfterFirst.of( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(15)), + AfterPane.elementCountAtLeast(5))), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.injectElements(2, 3, 4, 5); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + + @Test + public void testRepeatedlyAfterFirstProcessingTime() throws Exception { + SimpleTriggerTester<GlobalWindow> tester = + TriggerTester.forTrigger( + Repeatedly.forever( + AfterFirst.of( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(15)), + AfterPane.elementCountAtLeast(5))), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15))); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + + @Test + public void testRepeatedlyElementCount() throws Exception { + SimpleTriggerTester<GlobalWindow> tester = + TriggerTester.forTrigger( + Repeatedly.forever(AfterPane.elementCountAtLeast(5)), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.injectElements(2, 3, 4, 5); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + + @Test + public void testRepeatedlyProcessingTime() throws Exception { + SimpleTriggerTester<GlobalWindow> tester = + TriggerTester.forTrigger( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(15))), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15))); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + + + @Test + public void testToString() { + Trigger trigger = Repeatedly.forever(new StubTrigger() { + @Override + public String toString() { + return "innerTrigger"; + } + }); + + assertEquals("Repeatedly.forever(innerTrigger)", trigger.toString()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java new file mode 100644 index 0000000..83077f4 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link ReshuffleTrigger}. + */ +@RunWith(JUnit4.class) +public class ReshuffleTriggerTest { + + /** Public so that other tests can instantiate {@link ReshuffleTrigger}. */ + public static <W extends BoundedWindow> ReshuffleTrigger<W> forTest() { + return new ReshuffleTrigger<>(); + } + + @Test + public void testShouldFire() throws Exception { + TriggerTester<Integer, IntervalWindow> tester = TriggerTester.forTrigger( + new ReshuffleTrigger<IntervalWindow>(), FixedWindows.of(Duration.millis(100))); + IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(300), new Instant(400)); + assertTrue(tester.shouldFire(arbitraryWindow)); + } + + @Test + public void testOnTimer() throws Exception { + TriggerTester<Integer, IntervalWindow> tester = TriggerTester.forTrigger( + new ReshuffleTrigger<IntervalWindow>(), FixedWindows.of(Duration.millis(100))); + IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(100), new Instant(200)); + tester.fireIfShouldFire(arbitraryWindow); + assertFalse(tester.isMarkedFinished(arbitraryWindow)); + } + + @Test + public void testToString() { + Trigger trigger = new ReshuffleTrigger<>(); + assertEquals("ReshuffleTrigger()", trigger.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java new file mode 100644 index 0000000..b258a79 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import com.google.common.collect.Lists; +import java.util.List; +import org.joda.time.Instant; + +/** + * No-op {@link OnceTrigger} implementation for testing. + */ +abstract class StubTrigger extends Trigger.OnceTrigger { + /** + * Create a stub {@link Trigger} instance which returns the specified name on {@link #toString()}. + */ + static StubTrigger named(final String name) { + return new StubTrigger() { + @Override + public String toString() { + return name; + } + }; + } + + protected StubTrigger() { + super(Lists.<Trigger>newArrayList()); + } + + @Override + protected void onOnlyFiring(TriggerContext context) throws Exception { + } + + @Override + public void onElement(OnElementContext c) throws Exception { + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + } + + @Override + public boolean shouldFire(TriggerContext context) throws Exception { + return false; + } + + @Override + protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { + return null; + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java new file mode 100644 index 0000000..cfc03b2 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.List; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link Trigger}. + */ +@RunWith(JUnit4.class) +public class TriggerTest { + + @Test + public void testTriggerToString() throws Exception { + assertEquals("AfterWatermark.pastEndOfWindow()", AfterWatermark.pastEndOfWindow().toString()); + assertEquals("Repeatedly.forever(AfterWatermark.pastEndOfWindow())", + Repeatedly.forever(AfterWatermark.pastEndOfWindow()).toString()); + } + + @Test + public void testIsCompatible() throws Exception { + assertTrue(new Trigger1(null).isCompatible(new Trigger1(null))); + assertTrue(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null))) + .isCompatible(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null))))); + + assertFalse(new Trigger1(null).isCompatible(new Trigger2(null))); + assertFalse(new Trigger1(Arrays.<Trigger>asList(new Trigger1(null))) + .isCompatible(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null))))); + } + + private static class Trigger1 extends Trigger { + + private Trigger1(List<Trigger> subTriggers) { + super(subTriggers); + } + + @Override + public void onElement(Trigger.OnElementContext c) { } + + @Override + public void onMerge(Trigger.OnMergeContext c) { } + + @Override + protected Trigger getContinuationTrigger( + List<Trigger> continuationTriggers) { + return null; + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + return null; + } + + @Override + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + return false; + } + + @Override + public void onFire(Trigger.TriggerContext context) throws Exception { } + } + + private static class Trigger2 extends Trigger { + + private Trigger2(List<Trigger> subTriggers) { + super(subTriggers); + } + + @Override + public void onElement(Trigger.OnElementContext c) { } + + @Override + public void onMerge(Trigger.OnMergeContext c) { } + + @Override + protected Trigger getContinuationTrigger( + List<Trigger> continuationTriggers) { + return null; + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + return null; + } + + @Override + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + return false; + } + + @Override + public void onFire(Trigger.TriggerContext context) throws Exception { } + } +}