http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java new file mode 100644 index 0000000..7e6e938 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests the {@link AfterSynchronizedProcessingTime}. + */ +@RunWith(JUnit4.class) +public class AfterSynchronizedProcessingTimeTest { + + private Trigger underTest = new AfterSynchronizedProcessingTime(); + + @Test + public void testAfterProcessingTimeWithFixedWindows() throws Exception { + Duration windowDuration = Duration.millis(10); + SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger( + AfterProcessingTime + .pastFirstElementInPane() + .plusDelayOf(Duration.millis(5)), + FixedWindows.of(windowDuration)); + + tester.advanceProcessingTime(new Instant(10)); + + // Timer at 15 + tester.injectElements(1); + IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10)); + tester.advanceProcessingTime(new Instant(12)); + assertFalse(tester.shouldFire(firstWindow)); + + // Load up elements in the next window, timer at 17 for them + tester.injectElements(11, 12, 13); + IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20)); + assertFalse(tester.shouldFire(secondWindow)); + + // Not quite time to fire + tester.advanceProcessingTime(new Instant(14)); + assertFalse(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + + // Timer at 19 for these in the first window; it should be ignored since the 15 will fire first + tester.injectElements(2, 3); + + // Advance past the first timer and fire, finishing the first window + tester.advanceProcessingTime(new Instant(16)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + assertTrue(tester.isMarkedFinished(firstWindow)); + + // The next window fires and finishes now + tester.advanceProcessingTime(new Instant(18)); + assertTrue(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(secondWindow); + assertTrue(tester.isMarkedFinished(secondWindow)); + } + + @Test + public void testAfterProcessingTimeWithMergingWindow() throws Exception { + Duration windowDuration = Duration.millis(10); + SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger( + AfterProcessingTime + .pastFirstElementInPane() + .plusDelayOf(Duration.millis(5)), + Sessions.withGapDuration(windowDuration)); + + tester.advanceProcessingTime(new Instant(10)); + tester.injectElements(1); // in [1, 11), timer for 15 + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + assertFalse(tester.shouldFire(firstWindow)); + + tester.advanceProcessingTime(new Instant(12)); + tester.injectElements(3); // in [3, 13), timer for 17 + IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new Instant(13)); + assertFalse(tester.shouldFire(secondWindow)); + + tester.mergeWindows(); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(13)); + + tester.advanceProcessingTime(new Instant(16)); + assertTrue(tester.shouldFire(mergedWindow)); + } + + @Test + public void testFireDeadline() throws Exception { + assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, + underTest.getWatermarkThatGuaranteesFiring( + new IntervalWindow(new Instant(0), new Instant(10)))); + } + + @Test + public void testContinuation() throws Exception { + assertEquals(underTest, underTest.getContinuationTrigger()); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java new file mode 100644 index 0000000..084027b --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; + +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +/** + * Tests the {@link AfterWatermark} triggers. + */ +@RunWith(JUnit4.class) +public class AfterWatermarkTest { + + @Mock private OnceTrigger mockEarly; + @Mock private OnceTrigger mockLate; + + private SimpleTriggerTester<IntervalWindow> tester; + private static Trigger.TriggerContext anyTriggerContext() { + return Mockito.<Trigger.TriggerContext>any(); + } + private static Trigger.OnElementContext anyElementContext() { + return Mockito.<Trigger.OnElementContext>any(); + } + + private void injectElements(int... elements) throws Exception { + for (int element : elements) { + doNothing().when(mockEarly).onElement(anyElementContext()); + doNothing().when(mockLate).onElement(anyElementContext()); + tester.injectElements(element); + } + } + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + public void testRunningAsTrigger(OnceTrigger mockTrigger, IntervalWindow window) + throws Exception { + + // Don't fire due to mock saying no + when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false); + assertFalse(tester.shouldFire(window)); // not ready + + // Fire due to mock trigger; early trigger is required to be a OnceTrigger + when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + assertTrue(tester.shouldFire(window)); // ready + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + } + + @Test + public void testEarlyAndAtWatermark() throws Exception { + tester = TriggerTester.forTrigger( + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(mockEarly), + FixedWindows.of(Duration.millis(100))); + + injectElements(1); + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); + + testRunningAsTrigger(mockEarly, window); + + // Fire due to watermark + when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false); + tester.advanceInputWatermark(new Instant(100)); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + @Test + public void testAtWatermarkAndLate() throws Exception { + tester = TriggerTester.forTrigger( + AfterWatermark.pastEndOfWindow() + .withLateFirings(mockLate), + FixedWindows.of(Duration.millis(100))); + + injectElements(1); + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); + + // No early firing, just double checking + when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(true); + assertFalse(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + // Fire due to watermark + when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false); + tester.advanceInputWatermark(new Instant(100)); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + testRunningAsTrigger(mockLate, window); + } + + @Test + public void testEarlyAndAtWatermarkAndLate() throws Exception { + tester = TriggerTester.forTrigger( + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(mockEarly) + .withLateFirings(mockLate), + FixedWindows.of(Duration.millis(100))); + + injectElements(1); + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); + + testRunningAsTrigger(mockEarly, window); + + // Fire due to watermark + when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false); + tester.advanceInputWatermark(new Instant(100)); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + testRunningAsTrigger(mockLate, window); + } + + /** + * Tests that if the EOW is finished in both as well as the merged window, then + * it is finished in the merged result. + * + * <p>Because windows are discarded when a trigger finishes, we need to embed this + * in a sequence in order to check that it is re-activated. So this test is potentially + * sensitive to other triggers' correctness. + */ + @Test + public void testOnMergeAlreadyFinished() throws Exception { + tester = TriggerTester.forTrigger( + AfterEach.inOrder( + AfterWatermark.pastEndOfWindow(), + Repeatedly.forever(AfterPane.elementCountAtLeast(1))), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements(1); + tester.injectElements(5); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + + // Finish the AfterWatermark.pastEndOfWindow() trigger in both windows + tester.advanceInputWatermark(new Instant(15)); + assertTrue(tester.shouldFire(firstWindow)); + assertTrue(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + tester.fireIfShouldFire(secondWindow); + + // Confirm that we are on the second trigger by probing + assertFalse(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + tester.injectElements(1); + tester.injectElements(5); + assertTrue(tester.shouldFire(firstWindow)); + assertTrue(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + tester.fireIfShouldFire(secondWindow); + + // Merging should leave it finished + tester.mergeWindows(); + + // Confirm that we are on the second trigger by probing + assertFalse(tester.shouldFire(mergedWindow)); + tester.injectElements(1); + assertTrue(tester.shouldFire(mergedWindow)); + } + + /** + * Tests that the trigger rewinds to be non-finished in the merged window. + * + * <p>Because windows are discarded when a trigger finishes, we need to embed this + * in a sequence in order to check that it is re-activated. So this test is potentially + * sensitive to other triggers' correctness. + */ + @Test + public void testOnMergeRewinds() throws Exception { + tester = TriggerTester.forTrigger( + AfterEach.inOrder( + AfterWatermark.pastEndOfWindow(), + Repeatedly.forever(AfterPane.elementCountAtLeast(1))), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements(1); + tester.injectElements(5); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + + // Finish the AfterWatermark.pastEndOfWindow() trigger in only the first window + tester.advanceInputWatermark(new Instant(11)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + + // Confirm that we are on the second trigger by probing + assertFalse(tester.shouldFire(firstWindow)); + tester.injectElements(1); + assertTrue(tester.shouldFire(firstWindow)); + tester.fireIfShouldFire(firstWindow); + + // Merging should re-activate the watermark trigger in the merged window + tester.mergeWindows(); + + // Confirm that we are not on the second trigger by probing + assertFalse(tester.shouldFire(mergedWindow)); + tester.injectElements(1); + assertFalse(tester.shouldFire(mergedWindow)); + + // And confirm that advancing the watermark fires again + tester.advanceInputWatermark(new Instant(15)); + assertTrue(tester.shouldFire(mergedWindow)); + } + + /** + * Tests that if the EOW is finished in both as well as the merged window, then + * it is finished in the merged result. + * + * <p>Because windows are discarded when a trigger finishes, we need to embed this + * in a sequence in order to check that it is re-activated. So this test is potentially + * sensitive to other triggers' correctness. + */ + @Test + public void testEarlyAndLateOnMergeAlreadyFinished() throws Exception { + tester = TriggerTester.forTrigger( + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(AfterPane.elementCountAtLeast(100)) + .withLateFirings(AfterPane.elementCountAtLeast(1)), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements(1); + tester.injectElements(5); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + + // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in both windows + tester.advanceInputWatermark(new Instant(15)); + assertTrue(tester.shouldFire(firstWindow)); + assertTrue(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + tester.fireIfShouldFire(secondWindow); + + // Confirm that we are on the late trigger by probing + assertFalse(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + tester.injectElements(1); + tester.injectElements(5); + assertTrue(tester.shouldFire(firstWindow)); + assertTrue(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + tester.fireIfShouldFire(secondWindow); + + // Merging should leave it on the late trigger + tester.mergeWindows(); + + // Confirm that we are on the late trigger by probing + assertFalse(tester.shouldFire(mergedWindow)); + tester.injectElements(1); + assertTrue(tester.shouldFire(mergedWindow)); + } + + /** + * Tests that the trigger rewinds to be non-finished in the merged window. + * + * <p>Because windows are discarded when a trigger finishes, we need to embed this + * in a sequence in order to check that it is re-activated. So this test is potentially + * sensitive to other triggers' correctness. + */ + @Test + public void testEarlyAndLateOnMergeRewinds() throws Exception { + tester = TriggerTester.forTrigger( + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(AfterPane.elementCountAtLeast(100)) + .withLateFirings(AfterPane.elementCountAtLeast(1)), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements(1); + tester.injectElements(5); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + + // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in only the first window + tester.advanceInputWatermark(new Instant(11)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + tester.fireIfShouldFire(firstWindow); + + // Confirm that we are on the late trigger by probing + assertFalse(tester.shouldFire(firstWindow)); + tester.injectElements(1); + assertTrue(tester.shouldFire(firstWindow)); + tester.fireIfShouldFire(firstWindow); + + // Merging should re-activate the early trigger in the merged window + tester.mergeWindows(); + + // Confirm that we are not on the second trigger by probing + assertFalse(tester.shouldFire(mergedWindow)); + tester.injectElements(1); + assertFalse(tester.shouldFire(mergedWindow)); + + // And confirm that advancing the watermark fires again + tester.advanceInputWatermark(new Instant(15)); + assertTrue(tester.shouldFire(mergedWindow)); + } + + @Test + public void testFromEndOfWindowToString() { + Trigger trigger = AfterWatermark.pastEndOfWindow(); + assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString()); + } + + @Test + public void testEarlyFiringsToString() { + Trigger trigger = AfterWatermark.pastEndOfWindow().withEarlyFirings(StubTrigger.named("t1")); + + assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1)", trigger.toString()); + } + + @Test + public void testLateFiringsToString() { + Trigger trigger = AfterWatermark.pastEndOfWindow().withLateFirings(StubTrigger.named("t1")); + + assertEquals("AfterWatermark.pastEndOfWindow().withLateFirings(t1)", trigger.toString()); + } + + @Test + public void testEarlyAndLateFiringsToString() { + Trigger trigger = + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(StubTrigger.named("t1")) + .withLateFirings(StubTrigger.named("t2")); + + assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1).withLateFirings(t2)", + trigger.toString()); + } + + @Test + public void testToStringExcludesNeverTrigger() { + Trigger trigger = + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(Never.ever()) + .withLateFirings(Never.ever()); + + assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java new file mode 100644 index 0000000..673e555 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests the {@link DefaultTrigger}, which should be equivalent to + * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}. + */ +@RunWith(JUnit4.class) +public class DefaultTriggerTest { + + SimpleTriggerTester<IntervalWindow> tester; + + @Test + public void testDefaultTriggerFixedWindows() throws Exception { + tester = TriggerTester.forTrigger( + DefaultTrigger.of(), + FixedWindows.of(Duration.millis(100))); + + tester.injectElements( + 1, // [0, 100) + 101); // [100, 200) + + IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(100)); + IntervalWindow secondWindow = new IntervalWindow(new Instant(100), new Instant(200)); + + // Advance the watermark almost to the end of the first window. + tester.advanceInputWatermark(new Instant(99)); + assertFalse(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + + // Advance watermark past end of the first window, which is then ready + tester.advanceInputWatermark(new Instant(100)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + + // Fire, but the first window is still allowed to fire + tester.fireIfShouldFire(firstWindow); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + + // Advance watermark to 200, then both are ready + tester.advanceInputWatermark(new Instant(200)); + assertTrue(tester.shouldFire(firstWindow)); + assertTrue(tester.shouldFire(secondWindow)); + + assertFalse(tester.isMarkedFinished(firstWindow)); + assertFalse(tester.isMarkedFinished(secondWindow)); + } + + @Test + public void testDefaultTriggerSlidingWindows() throws Exception { + tester = TriggerTester.forTrigger( + DefaultTrigger.of(), + SlidingWindows.of(Duration.millis(100)).every(Duration.millis(50))); + + tester.injectElements( + 1, // [-50, 50), [0, 100) + 50); // [0, 100), [50, 150) + + IntervalWindow firstWindow = new IntervalWindow(new Instant(-50), new Instant(50)); + IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new Instant(100)); + IntervalWindow thirdWindow = new IntervalWindow(new Instant(50), new Instant(150)); + + assertFalse(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + assertFalse(tester.shouldFire(thirdWindow)); + + // At 50, the first becomes ready; it stays ready after firing + tester.advanceInputWatermark(new Instant(50)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + assertFalse(tester.shouldFire(thirdWindow)); + tester.fireIfShouldFire(firstWindow); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + assertFalse(tester.shouldFire(thirdWindow)); + + // At 99, the first is still the only one ready + tester.advanceInputWatermark(new Instant(99)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + assertFalse(tester.shouldFire(thirdWindow)); + + // At 100, the first and second are ready + tester.advanceInputWatermark(new Instant(100)); + assertTrue(tester.shouldFire(firstWindow)); + assertTrue(tester.shouldFire(secondWindow)); + assertFalse(tester.shouldFire(thirdWindow)); + tester.fireIfShouldFire(firstWindow); + + assertFalse(tester.isMarkedFinished(firstWindow)); + assertFalse(tester.isMarkedFinished(secondWindow)); + assertFalse(tester.isMarkedFinished(thirdWindow)); + } + + @Test + public void testDefaultTriggerSessions() throws Exception { + tester = TriggerTester.forTrigger( + DefaultTrigger.of(), + Sessions.withGapDuration(Duration.millis(100))); + + tester.injectElements( + 1, // [1, 101) + 50); // [50, 150) + tester.mergeWindows(); + + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(101)); + IntervalWindow secondWindow = new IntervalWindow(new Instant(50), new Instant(150)); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(150)); + + // Not ready in any window yet + tester.advanceInputWatermark(new Instant(100)); + assertFalse(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + assertFalse(tester.shouldFire(mergedWindow)); + + // The first window is "ready": the caller owns knowledge of which windows are merged away + tester.advanceInputWatermark(new Instant(149)); + assertTrue(tester.shouldFire(firstWindow)); + assertFalse(tester.shouldFire(secondWindow)); + assertFalse(tester.shouldFire(mergedWindow)); + + // Now ready on all windows + tester.advanceInputWatermark(new Instant(150)); + assertTrue(tester.shouldFire(firstWindow)); + assertTrue(tester.shouldFire(secondWindow)); + assertTrue(tester.shouldFire(mergedWindow)); + + // Ensure it repeats + tester.fireIfShouldFire(mergedWindow); + assertTrue(tester.shouldFire(mergedWindow)); + + assertFalse(tester.isMarkedFinished(mergedWindow)); + } + + @Test + public void testFireDeadline() throws Exception { + assertEquals(new Instant(9), DefaultTrigger.of().getWatermarkThatGuaranteesFiring( + new IntervalWindow(new Instant(0), new Instant(10)))); + assertEquals(GlobalWindow.INSTANCE.maxTimestamp(), + DefaultTrigger.of().getWatermarkThatGuaranteesFiring(GlobalWindow.INSTANCE)); + } + + @Test + public void testContinuation() throws Exception { + assertEquals(DefaultTrigger.of(), DefaultTrigger.of().getContinuationTrigger()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java new file mode 100644 index 0000000..fb2b4d5 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link Never}. + */ +@RunWith(JUnit4.class) +public class NeverTest { + private SimpleTriggerTester<IntervalWindow> triggerTester; + + @Before + public void setup() throws Exception { + triggerTester = + TriggerTester.forTrigger( + Never.ever(), FixedWindows.of(Duration.standardMinutes(5))); + } + + @Test + public void falseAfterEndOfWindow() throws Exception { + triggerTester.injectElements(TimestampedValue.of(1, new Instant(1))); + IntervalWindow window = + new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardMinutes(5))); + assertThat(triggerTester.shouldFire(window), is(false)); + triggerTester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); + assertThat(triggerTester.shouldFire(window), is(false)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java new file mode 100644 index 0000000..7289d97 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link OrFinallyTrigger}. + */ +@RunWith(JUnit4.class) +public class OrFinallyTriggerTest { + + private SimpleTriggerTester<IntervalWindow> tester; + + /** + * Tests that for {@code OrFinally(actual, ...)} when {@code actual} + * fires and finishes, the {@code OrFinally} also fires and finishes. + */ + @Test + public void testActualFiresAndFinishes() throws Exception { + tester = TriggerTester.forTrigger( + new OrFinallyTrigger( + AfterPane.elementCountAtLeast(2), + AfterPane.elementCountAtLeast(100)), + FixedWindows.of(Duration.millis(100))); + + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); + + // Not yet firing + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + assertFalse(tester.isMarkedFinished(window)); + + // The actual fires and finishes + tester.injectElements(2); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + /** + * Tests that for {@code OrFinally(actual, ...)} when {@code actual} + * fires but does not finish, the {@code OrFinally} also fires and also does not + * finish. + */ + @Test + public void testActualFiresOnly() throws Exception { + tester = TriggerTester.forTrigger( + new OrFinallyTrigger( + Repeatedly.forever(AfterPane.elementCountAtLeast(2)), + AfterPane.elementCountAtLeast(100)), + FixedWindows.of(Duration.millis(100))); + + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); + + // Not yet firing + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + assertFalse(tester.isMarkedFinished(window)); + + // The actual fires but does not finish + tester.injectElements(2); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + // And again + tester.injectElements(3, 4); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + } + + /** + * Tests that if the first trigger rewinds to be non-finished in the merged window, + * then it becomes the currently active trigger again, with real triggers. + */ + @Test + public void testShouldFireAfterMerge() throws Exception { + tester = TriggerTester.forTrigger( + AfterEach.inOrder( + AfterPane.elementCountAtLeast(5) + .orFinally(AfterWatermark.pastEndOfWindow()), + Repeatedly.forever(AfterPane.elementCountAtLeast(1))), + Sessions.withGapDuration(Duration.millis(10))); + + // Finished the orFinally in the first window + tester.injectElements(1); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + assertFalse(tester.shouldFire(firstWindow)); + tester.advanceInputWatermark(new Instant(11)); + assertTrue(tester.shouldFire(firstWindow)); + tester.fireIfShouldFire(firstWindow); + + // Set up second window where it is not done + tester.injectElements(5); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + assertFalse(tester.shouldFire(secondWindow)); + + // Merge them, if the merged window were on the second trigger, it would be ready + tester.mergeWindows(); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + assertFalse(tester.shouldFire(mergedWindow)); + + // Now adding 3 more makes the main trigger ready to fire + tester.injectElements(1, 2, 3, 4, 5); + tester.mergeWindows(); + assertTrue(tester.shouldFire(mergedWindow)); + } + + /** + * Tests that for {@code OrFinally(actual, until)} when {@code actual} + * fires but does not finish, then {@code until} fires and finishes, the + * whole thing fires and finished. + */ + @Test + public void testActualFiresButUntilFinishes() throws Exception { + tester = TriggerTester.forTrigger( + new OrFinallyTrigger( + Repeatedly.forever(AfterPane.elementCountAtLeast(2)), + AfterPane.elementCountAtLeast(3)), + FixedWindows.of(Duration.millis(10))); + + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + + // Before any firing + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + assertFalse(tester.isMarkedFinished(window)); + + // The actual fires but doesn't finish + tester.injectElements(2); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.isMarkedFinished(window)); + + // The until fires and finishes; the trigger is finished + tester.injectElements(3); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertTrue(tester.isMarkedFinished(window)); + } + + @Test + public void testFireDeadline() throws Exception { + BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + + assertEquals(new Instant(9), + Repeatedly.forever(AfterWatermark.pastEndOfWindow()) + .getWatermarkThatGuaranteesFiring(window)); + assertEquals(new Instant(9), Repeatedly.forever(AfterWatermark.pastEndOfWindow()) + .orFinally(AfterPane.elementCountAtLeast(1)) + .getWatermarkThatGuaranteesFiring(window)); + assertEquals(new Instant(9), Repeatedly.forever(AfterPane.elementCountAtLeast(1)) + .orFinally(AfterWatermark.pastEndOfWindow()) + .getWatermarkThatGuaranteesFiring(window)); + assertEquals(new Instant(9), + AfterPane.elementCountAtLeast(100) + .orFinally(AfterWatermark.pastEndOfWindow()) + .getWatermarkThatGuaranteesFiring(window)); + + assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, + Repeatedly.forever(AfterPane.elementCountAtLeast(1)) + .orFinally(AfterPane.elementCountAtLeast(10)) + .getWatermarkThatGuaranteesFiring(window)); + } + + @Test + public void testContinuation() throws Exception { + OnceTrigger triggerA = AfterProcessingTime.pastFirstElementInPane(); + OnceTrigger triggerB = AfterWatermark.pastEndOfWindow(); + Trigger aOrFinallyB = triggerA.orFinally(triggerB); + Trigger bOrFinallyA = triggerB.orFinally(triggerA); + assertEquals( + Repeatedly.forever( + triggerA.getContinuationTrigger().orFinally(triggerB.getContinuationTrigger())), + aOrFinallyB.getContinuationTrigger()); + assertEquals( + Repeatedly.forever( + triggerB.getContinuationTrigger().orFinally(triggerA.getContinuationTrigger())), + bOrFinallyA.getContinuationTrigger()); + } + + @Test + public void testToString() { + Trigger trigger = StubTrigger.named("t1").orFinally(StubTrigger.named("t2")); + assertEquals("t1.orFinally(t2)", trigger.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java new file mode 100644 index 0000000..6e8930d --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link Repeatedly}. + */ +@RunWith(JUnit4.class) +public class RepeatedlyTest { + + @Mock private Trigger mockTrigger; + private SimpleTriggerTester<IntervalWindow> tester; + private static Trigger.TriggerContext anyTriggerContext() { + return Mockito.<Trigger.TriggerContext>any(); + } + + public void setUp(WindowFn<Object, IntervalWindow> windowFn) throws Exception { + MockitoAnnotations.initMocks(this); + tester = TriggerTester.forTrigger(Repeatedly.forever(mockTrigger), windowFn); + } + + /** + * Tests that onElement correctly passes the data on to the subtrigger. + */ + @Test + public void testOnElement() throws Exception { + setUp(FixedWindows.of(Duration.millis(10))); + tester.injectElements(37); + verify(mockTrigger).onElement(Mockito.<Trigger.OnElementContext>any()); + } + + /** + * Tests that the repeatedly is ready to fire whenever the subtrigger is ready. + */ + @Test + public void testShouldFire() throws Exception { + setUp(FixedWindows.of(Duration.millis(10))); + + when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + assertTrue(tester.shouldFire(new IntervalWindow(new Instant(0), new Instant(10)))); + + when(mockTrigger.shouldFire(Mockito.<Trigger.TriggerContext>any())) + .thenReturn(false); + assertFalse(tester.shouldFire(new IntervalWindow(new Instant(0), new Instant(10)))); + } + + /** + * Tests that the watermark that guarantees firing is that of the subtrigger. + */ + @Test + public void testFireDeadline() throws Exception { + setUp(FixedWindows.of(Duration.millis(10))); + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + Instant arbitraryInstant = new Instant(34957849); + + when(mockTrigger.getWatermarkThatGuaranteesFiring(Mockito.<IntervalWindow>any())) + .thenReturn(arbitraryInstant); + + assertThat( + Repeatedly.forever(mockTrigger).getWatermarkThatGuaranteesFiring(window), + equalTo(arbitraryInstant)); + } + + @Test + public void testContinuation() throws Exception { + Trigger trigger = AfterProcessingTime.pastFirstElementInPane(); + Trigger repeatedly = Repeatedly.forever(trigger); + assertEquals( + Repeatedly.forever(trigger.getContinuationTrigger()), repeatedly.getContinuationTrigger()); + assertEquals( + Repeatedly.forever(trigger.getContinuationTrigger().getContinuationTrigger()), + repeatedly.getContinuationTrigger().getContinuationTrigger()); + } + + @Test + public void testShouldFireAfterMerge() throws Exception { + tester = TriggerTester.forTrigger( + Repeatedly.forever(AfterPane.elementCountAtLeast(2)), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements(1); + IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); + assertFalse(tester.shouldFire(firstWindow)); + + tester.injectElements(5); + IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); + assertFalse(tester.shouldFire(secondWindow)); + + // Merge them, if the merged window were on the second trigger, it would be ready + tester.mergeWindows(); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); + assertTrue(tester.shouldFire(mergedWindow)); + } + + @Test + public void testRepeatedlyAfterFirstElementCount() throws Exception { + SimpleTriggerTester<GlobalWindow> tester = + TriggerTester.forTrigger( + Repeatedly.forever( + AfterFirst.of( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(15)), + AfterPane.elementCountAtLeast(5))), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.injectElements(2, 3, 4, 5); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + + @Test + public void testRepeatedlyAfterFirstProcessingTime() throws Exception { + SimpleTriggerTester<GlobalWindow> tester = + TriggerTester.forTrigger( + Repeatedly.forever( + AfterFirst.of( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(15)), + AfterPane.elementCountAtLeast(5))), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15))); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + + @Test + public void testRepeatedlyElementCount() throws Exception { + SimpleTriggerTester<GlobalWindow> tester = + TriggerTester.forTrigger( + Repeatedly.forever(AfterPane.elementCountAtLeast(5)), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.injectElements(2, 3, 4, 5); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + + @Test + public void testRepeatedlyProcessingTime() throws Exception { + SimpleTriggerTester<GlobalWindow> tester = + TriggerTester.forTrigger( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(15))), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15))); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + + + @Test + public void testToString() { + Trigger trigger = Repeatedly.forever(new StubTrigger() { + @Override + public String toString() { + return "innerTrigger"; + } + }); + + assertEquals("Repeatedly.forever(innerTrigger)", trigger.toString()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java new file mode 100644 index 0000000..b258a79 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import com.google.common.collect.Lists; +import java.util.List; +import org.joda.time.Instant; + +/** + * No-op {@link OnceTrigger} implementation for testing. + */ +abstract class StubTrigger extends Trigger.OnceTrigger { + /** + * Create a stub {@link Trigger} instance which returns the specified name on {@link #toString()}. + */ + static StubTrigger named(final String name) { + return new StubTrigger() { + @Override + public String toString() { + return name; + } + }; + } + + protected StubTrigger() { + super(Lists.<Trigger>newArrayList()); + } + + @Override + protected void onOnlyFiring(TriggerContext context) throws Exception { + } + + @Override + public void onElement(OnElementContext c) throws Exception { + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + } + + @Override + public boolean shouldFire(TriggerContext context) throws Exception { + return false; + } + + @Override + protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { + return null; + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java new file mode 100644 index 0000000..cfc03b2 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.List; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link Trigger}. + */ +@RunWith(JUnit4.class) +public class TriggerTest { + + @Test + public void testTriggerToString() throws Exception { + assertEquals("AfterWatermark.pastEndOfWindow()", AfterWatermark.pastEndOfWindow().toString()); + assertEquals("Repeatedly.forever(AfterWatermark.pastEndOfWindow())", + Repeatedly.forever(AfterWatermark.pastEndOfWindow()).toString()); + } + + @Test + public void testIsCompatible() throws Exception { + assertTrue(new Trigger1(null).isCompatible(new Trigger1(null))); + assertTrue(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null))) + .isCompatible(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null))))); + + assertFalse(new Trigger1(null).isCompatible(new Trigger2(null))); + assertFalse(new Trigger1(Arrays.<Trigger>asList(new Trigger1(null))) + .isCompatible(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null))))); + } + + private static class Trigger1 extends Trigger { + + private Trigger1(List<Trigger> subTriggers) { + super(subTriggers); + } + + @Override + public void onElement(Trigger.OnElementContext c) { } + + @Override + public void onMerge(Trigger.OnMergeContext c) { } + + @Override + protected Trigger getContinuationTrigger( + List<Trigger> continuationTriggers) { + return null; + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + return null; + } + + @Override + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + return false; + } + + @Override + public void onFire(Trigger.TriggerContext context) throws Exception { } + } + + private static class Trigger2 extends Trigger { + + private Trigger2(List<Trigger> subTriggers) { + super(subTriggers); + } + + @Override + public void onElement(Trigger.OnElementContext c) { } + + @Override + public void onMerge(Trigger.OnMergeContext c) { } + + @Override + protected Trigger getContinuationTrigger( + List<Trigger> continuationTriggers) { + return null; + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + return null; + } + + @Override + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + return false; + } + + @Override + public void onFire(Trigger.TriggerContext context) throws Exception { } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java new file mode 100644 index 0000000..5fe17ad --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.ActiveWindowSet.MergeCallback; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; +import org.apache.beam.sdk.util.state.InMemoryTimerInternals; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace; +import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace; +import org.apache.beam.sdk.util.state.TestInMemoryStateInternals; +import org.apache.beam.sdk.util.state.TimerCallback; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * Test utility that runs a {@link Trigger}, using in-memory stub implementation to provide + * the {@link StateInternals}. + * + * @param <W> The type of windows being used. + */ +public class TriggerTester<InputT, W extends BoundedWindow> { + + /** + * A {@link TriggerTester} specialized to {@link Integer} values, so elements and timestamps + * can be conflated. Today, triggers should not observed the element type, so this is the + * only trigger tester that needs to be used. + */ + public static class SimpleTriggerTester<W extends BoundedWindow> + extends TriggerTester<Integer, W> { + + private SimpleTriggerTester(WindowingStrategy<Object, W> windowingStrategy) throws Exception { + super(windowingStrategy); + } + + public void injectElements(int... values) throws Exception { + List<TimestampedValue<Integer>> timestampedValues = + Lists.newArrayListWithCapacity(values.length); + for (int value : values) { + timestampedValues.add(TimestampedValue.of(value, new Instant(value))); + } + injectElements(timestampedValues); + } + + public SimpleTriggerTester<W> withAllowedLateness(Duration allowedLateness) throws Exception { + return new SimpleTriggerTester<>( + windowingStrategy.withAllowedLateness(allowedLateness)); + } + } + + protected final WindowingStrategy<Object, W> windowingStrategy; + + private final TestInMemoryStateInternals<?> stateInternals = + new TestInMemoryStateInternals<Object>(null /* key */); + private final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals(); + private final TriggerContextFactory<W> contextFactory; + private final WindowFn<Object, W> windowFn; + private final ActiveWindowSet<W> activeWindows; + private final Map<W, W> windowToMergeResult; + + /** + * An {@link ExecutableTrigger} built from the {@link Trigger} or {@link Trigger} + * under test. + */ + private final ExecutableTrigger executableTrigger; + + /** + * A map from a window and trigger to whether that trigger is finished for the window. + */ + private final Map<W, FinishedTriggers> finishedSets; + + public static <W extends BoundedWindow> SimpleTriggerTester<W> forTrigger( + Trigger trigger, WindowFn<Object, W> windowFn) + throws Exception { + WindowingStrategy<Object, W> windowingStrategy = + WindowingStrategy.of(windowFn).withTrigger(trigger) + // Merging requires accumulation mode or early firings can break up a session. + // Not currently an issue with the tester (because we never GC) but we don't want + // mystery failures due to violating this need. + .withMode(windowFn.isNonMerging() + ? AccumulationMode.DISCARDING_FIRED_PANES + : AccumulationMode.ACCUMULATING_FIRED_PANES); + + return new SimpleTriggerTester<>(windowingStrategy); + } + + public static <InputT, W extends BoundedWindow> TriggerTester<InputT, W> forAdvancedTrigger( + Trigger trigger, WindowFn<Object, W> windowFn) throws Exception { + WindowingStrategy<Object, W> strategy = + WindowingStrategy.of(windowFn).withTrigger(trigger) + // Merging requires accumulation mode or early firings can break up a session. + // Not currently an issue with the tester (because we never GC) but we don't want + // mystery failures due to violating this need. + .withMode(windowFn.isNonMerging() + ? AccumulationMode.DISCARDING_FIRED_PANES + : AccumulationMode.ACCUMULATING_FIRED_PANES); + + return new TriggerTester<>(strategy); + } + + protected TriggerTester(WindowingStrategy<Object, W> windowingStrategy) throws Exception { + this.windowingStrategy = windowingStrategy; + this.windowFn = windowingStrategy.getWindowFn(); + this.executableTrigger = windowingStrategy.getTrigger(); + this.finishedSets = new HashMap<>(); + + this.activeWindows = + windowFn.isNonMerging() + ? new NonMergingActiveWindowSet<W>() + : new MergingActiveWindowSet<W>(windowFn, stateInternals); + this.windowToMergeResult = new HashMap<>(); + + this.contextFactory = + new TriggerContextFactory<>(windowingStrategy.getWindowFn(), stateInternals, activeWindows); + } + + /** + * Instructs the trigger to clear its state for the given window. + */ + public void clearState(W window) throws Exception { + executableTrigger.invokeClear(contextFactory.base(window, + new TestTimers(windowNamespace(window)), executableTrigger, getFinishedSet(window))); + } + + /** + * Asserts that the trigger has actually cleared all of its state for the given window. Since + * the trigger under test is the root, this makes the assert for all triggers regardless + * of their position in the trigger tree. + */ + public void assertCleared(W window) { + for (StateNamespace untypedNamespace : stateInternals.getNamespacesInUse()) { + if (untypedNamespace instanceof WindowAndTriggerNamespace) { + @SuppressWarnings("unchecked") + WindowAndTriggerNamespace<W> namespace = (WindowAndTriggerNamespace<W>) untypedNamespace; + if (namespace.getWindow().equals(window)) { + Set<?> tagsInUse = stateInternals.getTagsInUse(namespace); + assertTrue("Trigger has not cleared tags: " + tagsInUse, tagsInUse.isEmpty()); + } + } + } + } + + /** + * Returns {@code true} if the {@link Trigger} under test is finished for the given window. + */ + public boolean isMarkedFinished(W window) { + FinishedTriggers finishedSet = finishedSets.get(window); + if (finishedSet == null) { + return false; + } + + return finishedSet.isFinished(executableTrigger); + } + + private StateNamespace windowNamespace(W window) { + return StateNamespaces.window(windowFn.windowCoder(), checkNotNull(window)); + } + + /** + * Advance the input watermark to the specified time, then advance the output watermark as far as + * possible. + */ + public void advanceInputWatermark(Instant newInputWatermark) throws Exception { + // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694 + timerInternals.advanceInputWatermark(TimerCallback.NO_OP, newInputWatermark); + } + + /** Advance the processing time to the specified time. */ + public void advanceProcessingTime(Instant newProcessingTime) throws Exception { + // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694 + timerInternals.advanceProcessingTime(TimerCallback.NO_OP, newProcessingTime); + } + + /** + * Inject all the timestamped values (after passing through the window function) as if they + * arrived in a single chunk of a bundle (or work-unit). + */ + @SafeVarargs + public final void injectElements(TimestampedValue<InputT>... values) throws Exception { + injectElements(Arrays.asList(values)); + } + + public final void injectElements(Collection<TimestampedValue<InputT>> values) throws Exception { + for (TimestampedValue<InputT> value : values) { + WindowTracing.trace("TriggerTester.injectElements: {}", value); + } + + List<WindowedValue<InputT>> windowedValues = Lists.newArrayListWithCapacity(values.size()); + + for (TimestampedValue<InputT> input : values) { + try { + InputT value = input.getValue(); + Instant timestamp = input.getTimestamp(); + Collection<W> assignedWindows = windowFn.assignWindows(new TestAssignContext<W>( + windowFn, value, timestamp, GlobalWindow.INSTANCE)); + + for (W window : assignedWindows) { + activeWindows.addActiveForTesting(window); + + // Today, triggers assume onTimer firing at the watermark time, whether or not they + // explicitly set the timer themselves. So this tester must set it. + timerInternals.setTimer( + TimerData.of(windowNamespace(window), window.maxTimestamp(), TimeDomain.EVENT_TIME)); + } + + windowedValues.add(WindowedValue.of(value, timestamp, assignedWindows, PaneInfo.NO_FIRING)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + for (WindowedValue<InputT> windowedValue : windowedValues) { + for (BoundedWindow untypedWindow : windowedValue.getWindows()) { + // SDK is responsible for type safety + @SuppressWarnings("unchecked") + W window = mergeResult((W) untypedWindow); + + Trigger.OnElementContext context = contextFactory.createOnElementContext(window, + new TestTimers(windowNamespace(window)), windowedValue.getTimestamp(), + executableTrigger, getFinishedSet(window)); + + if (!context.trigger().isFinished()) { + executableTrigger.invokeOnElement(context); + } + } + } + } + + public boolean shouldFire(W window) throws Exception { + Trigger.TriggerContext context = contextFactory.base( + window, + new TestTimers(windowNamespace(window)), + executableTrigger, getFinishedSet(window)); + executableTrigger.getSpec().prefetchShouldFire(context.state()); + return executableTrigger.invokeShouldFire(context); + } + + public void fireIfShouldFire(W window) throws Exception { + Trigger.TriggerContext context = contextFactory.base( + window, + new TestTimers(windowNamespace(window)), + executableTrigger, getFinishedSet(window)); + + executableTrigger.getSpec().prefetchShouldFire(context.state()); + if (executableTrigger.invokeShouldFire(context)) { + executableTrigger.getSpec().prefetchOnFire(context.state()); + executableTrigger.invokeOnFire(context); + if (context.trigger().isFinished()) { + activeWindows.remove(window); + executableTrigger.invokeClear(context); + } + } + } + + public void setSubTriggerFinishedForWindow(int subTriggerIndex, W window, boolean value) { + getFinishedSet(window).setFinished(executableTrigger.subTriggers().get(subTriggerIndex), value); + } + + /** + * Invokes merge from the {@link WindowFn} a single time and passes the resulting merge + * events on to the trigger under test. Does not persist the fact that merging happened, + * since it is just to test the trigger's {@code OnMerge} method. + */ + public final void mergeWindows() throws Exception { + windowToMergeResult.clear(); + activeWindows.merge(new MergeCallback<W>() { + @Override + public void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {} + + @Override + public void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception { + List<W> activeToBeMerged = new ArrayList<W>(); + for (W window : toBeMerged) { + windowToMergeResult.put(window, mergeResult); + if (activeWindows.isActive(window)) { + activeToBeMerged.add(window); + } + } + Map<W, FinishedTriggers> mergingFinishedSets = + Maps.newHashMapWithExpectedSize(activeToBeMerged.size()); + for (W oldWindow : activeToBeMerged) { + mergingFinishedSets.put(oldWindow, getFinishedSet(oldWindow)); + } + executableTrigger.invokeOnMerge(contextFactory.createOnMergeContext(mergeResult, + new TestTimers(windowNamespace(mergeResult)), executableTrigger, + getFinishedSet(mergeResult), mergingFinishedSets)); + timerInternals.setTimer(TimerData.of( + windowNamespace(mergeResult), mergeResult.maxTimestamp(), TimeDomain.EVENT_TIME)); + } + }); + } + + public W mergeResult(W window) { + W result = windowToMergeResult.get(window); + return result == null ? window : result; + } + + private FinishedTriggers getFinishedSet(W window) { + FinishedTriggers finishedSet = finishedSets.get(window); + if (finishedSet == null) { + finishedSet = FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()); + finishedSets.put(window, finishedSet); + } + return finishedSet; + } + + private static class TestAssignContext<W extends BoundedWindow> + extends WindowFn<Object, W>.AssignContext { + private Object element; + private Instant timestamp; + private BoundedWindow window; + + public TestAssignContext( + WindowFn<Object, W> windowFn, Object element, Instant timestamp, BoundedWindow window) { + windowFn.super(); + this.element = element; + this.timestamp = timestamp; + this.window = window; + } + + @Override + public Object element() { + return element; + } + + @Override + public Instant timestamp() { + return timestamp; + } + + @Override + public BoundedWindow window() { + return window; + } + } + + private class TestTimers implements Timers { + private final StateNamespace namespace; + + public TestTimers(StateNamespace namespace) { + checkArgument(namespace instanceof WindowNamespace); + this.namespace = namespace; + } + + @Override + public void setTimer(Instant timestamp, TimeDomain timeDomain) { + timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain)); + } + + @Override + public void deleteTimer(Instant timestamp, TimeDomain timeDomain) { + timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain)); + } + + @Override + public Instant currentProcessingTime() { + return timerInternals.currentProcessingTime(); + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return timerInternals.currentSynchronizedProcessingTime(); + } + + @Override + public Instant currentEventTime() { + return timerInternals.currentInputWatermarkTime(); + } + } +}