http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java new file mode 100644 index 0000000..0ffbbca --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import java.util.BitSet; +import java.util.Collection; +import java.util.Map; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.BitSetCoder; +import org.apache.beam.sdk.util.Timers; +import org.apache.beam.sdk.util.state.MergingStateAccessor; +import org.apache.beam.sdk.util.state.StateAccessor; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; +import org.apache.beam.sdk.util.state.ValueState; +import org.joda.time.Instant; + +/** + * Executes a trigger while managing persistence of information about which subtriggers are + * finished. Subtriggers include all recursive trigger expressions as well as the entire trigger. + * + * <p>Specifically, the responsibilities are: + * + * <ul> + * <li>Invoking the trigger's methods via its {@link ExecutableTriggerStateMachine} wrapper by + * constructing the appropriate trigger contexts.</li> + * <li>Committing a record of which subtriggers are finished to persistent state.</li> + * <li>Restoring the record of which subtriggers are finished from persistent state.</li> + * <li>Clearing out the persisted finished set when a caller indicates + * (via {#link #clearFinished}) that it is no longer needed.</li> + * </ul> + * + * <p>These responsibilities are intertwined: trigger contexts include mutable information about + * which subtriggers are finished. This class provides the information when building the contexts + * and commits the information when the method of the {@link ExecutableTriggerStateMachine} returns. + * + * @param <W> The kind of windows being processed. + */ +public class TriggerStateMachineRunner<W extends BoundedWindow> { + @VisibleForTesting + static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG = + StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of())); + + private final ExecutableTriggerStateMachine rootTrigger; + private final TriggerStateMachineContextFactory<W> contextFactory; + + public TriggerStateMachineRunner( + ExecutableTriggerStateMachine rootTrigger, + TriggerStateMachineContextFactory<W> contextFactory) { + checkState(rootTrigger.getTriggerIndex() == 0); + this.rootTrigger = rootTrigger; + this.contextFactory = contextFactory; + } + + private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) { + if (!isFinishedSetNeeded()) { + // If no trigger in the tree will ever have finished bits, then we don't need to read them. + // So that the code can be agnostic to that fact, we create a BitSet that is all 0 (not + // finished) for each trigger in the tree. + return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree()); + } + + BitSet bitSet = state.read(); + return bitSet == null + ? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree()) + : FinishedTriggersBitSet.fromBitSet(bitSet); + } + + + private void clearFinishedBits(ValueState<BitSet> state) { + if (!isFinishedSetNeeded()) { + // Nothing to clear. + return; + } + state.clear(); + } + + /** Return true if the trigger is closed in the window corresponding to the specified state. */ + public boolean isClosed(StateAccessor<?> state) { + return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger); + } + + public void prefetchForValue(W window, StateAccessor<?> state) { + if (isFinishedSetNeeded()) { + state.access(FINISHED_BITS_TAG).readLater(); + } + rootTrigger.getSpec().prefetchOnElement( + contextFactory.createStateAccessor(window, rootTrigger)); + } + + public void prefetchOnFire(W window, StateAccessor<?> state) { + if (isFinishedSetNeeded()) { + state.access(FINISHED_BITS_TAG).readLater(); + } + rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger)); + } + + public void prefetchShouldFire(W window, StateAccessor<?> state) { + if (isFinishedSetNeeded()) { + state.access(FINISHED_BITS_TAG).readLater(); + } + rootTrigger.getSpec().prefetchShouldFire( + contextFactory.createStateAccessor(window, rootTrigger)); + } + + /** + * Run the trigger logic to deal with a new value. + */ + public void processValue(W window, Instant timestamp, Timers timers, StateAccessor<?> state) + throws Exception { + // Clone so that we can detect changes and so that changes here don't pollute merging. + FinishedTriggersBitSet finishedSet = + readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); + TriggerStateMachine.OnElementContext triggerContext = contextFactory.createOnElementContext( + window, timers, timestamp, rootTrigger, finishedSet); + rootTrigger.invokeOnElement(triggerContext); + persistFinishedSet(state, finishedSet); + } + + public void prefetchForMerge( + W window, Collection<W> mergingWindows, MergingStateAccessor<?, W> state) { + if (isFinishedSetNeeded()) { + for (ValueState<?> value : state.accessInEachMergingWindow(FINISHED_BITS_TAG).values()) { + value.readLater(); + } + } + rootTrigger.getSpec().prefetchOnMerge(contextFactory.createMergingStateAccessor( + window, mergingWindows, rootTrigger)); + } + + /** + * Run the trigger merging logic as part of executing the specified merge. + */ + public void onMerge(W window, Timers timers, MergingStateAccessor<?, W> state) throws Exception { + // Clone so that we can detect changes and so that changes here don't pollute merging. + FinishedTriggersBitSet finishedSet = + readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); + + // And read the finished bits in each merging window. + ImmutableMap.Builder<W, FinishedTriggers> builder = ImmutableMap.builder(); + for (Map.Entry<W, ValueState<BitSet>> entry : + state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) { + // Don't need to clone these, since the trigger context doesn't allow modification + builder.put(entry.getKey(), readFinishedBits(entry.getValue())); + // Clear the underlying finished bits. + clearFinishedBits(entry.getValue()); + } + ImmutableMap<W, FinishedTriggers> mergingFinishedSets = builder.build(); + + TriggerStateMachine.OnMergeContext mergeContext = contextFactory.createOnMergeContext( + window, timers, rootTrigger, finishedSet, mergingFinishedSets); + + // Run the merge from the trigger + rootTrigger.invokeOnMerge(mergeContext); + + persistFinishedSet(state, finishedSet); + } + + public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) throws Exception { + FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); + TriggerStateMachine.TriggerContext context = contextFactory.base(window, timers, + rootTrigger, finishedSet); + return rootTrigger.invokeShouldFire(context); + } + + public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception { + // shouldFire should be false. + // However it is too expensive to assert. + FinishedTriggersBitSet finishedSet = + readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); + TriggerStateMachine.TriggerContext context = contextFactory.base(window, timers, + rootTrigger, finishedSet); + rootTrigger.invokeOnFire(context); + persistFinishedSet(state, finishedSet); + } + + private void persistFinishedSet( + StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) { + if (!isFinishedSetNeeded()) { + return; + } + + ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG); + if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) { + if (modifiedFinishedSet.getBitSet().isEmpty()) { + finishedSetState.clear(); + } else { + finishedSetState.write(modifiedFinishedSet.getBitSet()); + } + } + } + + /** + * Clear the finished bits. + */ + public void clearFinished(StateAccessor<?> state) { + clearFinishedBits(state.access(FINISHED_BITS_TAG)); + } + + /** + * Clear the state used for executing triggers, but leave the finished set to indicate + * the window is closed. + */ + public void clearState(W window, Timers timers, StateAccessor<?> state) throws Exception { + // Don't need to clone, because we'll be clearing the finished bits anyways. + FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)); + rootTrigger.invokeClear(contextFactory.base(window, timers, rootTrigger, finishedSet)); + } + + private boolean isFinishedSetNeeded() { + // TODO: If we know that no trigger in the tree will ever finish, we don't need to do the + // lookup. Right now, we special case this for the DefaultTrigger. + return !(rootTrigger.getSpec() instanceof DefaultTriggerStateMachine); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/package-info.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/package-info.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/package-info.java new file mode 100644 index 0000000..b7c7050 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * State machine implementations for triggers, called "triggers" because + * they react to events. + */ +package org.apache.beam.runners.core.triggers; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterAllTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterAllTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterAllTest.java deleted file mode 100644 index b591229..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterAllTest.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.TriggerTester; -import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link AfterAll}. - */ -@RunWith(JUnit4.class) -public class AfterAllTest { - - private SimpleTriggerTester<IntervalWindow> tester; - - @Test - public void testT1FiresFirst() throws Exception { - tester = TriggerTester.forTrigger( - AfterAll.of( - AfterPane.elementCountAtLeast(1), - AfterPane.elementCountAtLeast(2)), - FixedWindows.of(Duration.millis(100))); - - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); - - tester.injectElements(1); - assertFalse(tester.shouldFire(window)); - - tester.injectElements(2); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertTrue(tester.isMarkedFinished(window)); - } - - @Test - public void testT2FiresFirst() throws Exception { - tester = TriggerTester.forTrigger( - AfterAll.of( - AfterPane.elementCountAtLeast(2), - AfterPane.elementCountAtLeast(1)), - FixedWindows.of(Duration.millis(100))); - - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); - - tester.injectElements(1); - assertFalse(tester.shouldFire(window)); - - tester.injectElements(2); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertTrue(tester.isMarkedFinished(window)); - } - - /** - * Tests that the AfterAll properly unsets finished bits when a merge causing it to become - * unfinished. - */ - @Test - public void testOnMergeRewinds() throws Exception { - tester = TriggerTester.forTrigger( - AfterEach.inOrder( - AfterAll.of( - AfterWatermark.pastEndOfWindow(), - AfterPane.elementCountAtLeast(1)), - Repeatedly.forever(AfterPane.elementCountAtLeast(1))), - Sessions.withGapDuration(Duration.millis(10))); - - tester.injectElements(1); - IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); - - tester.injectElements(5); - IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); - - // Finish the AfterAll in the first window - tester.advanceInputWatermark(new Instant(11)); - assertTrue(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - tester.fireIfShouldFire(firstWindow); - - // Merge them; the AfterAll should not be finished - tester.mergeWindows(); - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); - assertFalse(tester.isMarkedFinished(mergedWindow)); - - // Confirm that we are back on the first trigger by probing that it is not ready to fire - // after an element (with merging) - tester.injectElements(3); - tester.mergeWindows(); - assertFalse(tester.shouldFire(mergedWindow)); - - // Fire the AfterAll in the merged window - tester.advanceInputWatermark(new Instant(15)); - assertTrue(tester.shouldFire(mergedWindow)); - tester.fireIfShouldFire(mergedWindow); - - // Confirm that we are on the second trigger by probing - tester.injectElements(2); - tester.mergeWindows(); - assertTrue(tester.shouldFire(mergedWindow)); - tester.fireIfShouldFire(mergedWindow); - tester.injectElements(2); - tester.mergeWindows(); - assertTrue(tester.shouldFire(mergedWindow)); - tester.fireIfShouldFire(mergedWindow); - } - - @Test - public void testFireDeadline() throws Exception { - BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10)); - - assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, - AfterAll.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(1)) - .getWatermarkThatGuaranteesFiring(window)); - } - - @Test - public void testContinuation() throws Exception { - OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane(); - OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow(); - Trigger afterAll = AfterAll.of(trigger1, trigger2); - assertEquals( - AfterAll.of(trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger()), - afterAll.getContinuationTrigger()); - } - - @Test - public void testToString() { - Trigger trigger = AfterAll.of(StubTrigger.named("t1"), StubTrigger.named("t2")); - assertEquals("AfterAll.of(t1, t2)", trigger.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterEachTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterEachTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterEachTest.java deleted file mode 100644 index c413c6e..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterEachTest.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.TriggerTester; -import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.MockitoAnnotations; - -/** - * Tests for {@link AfterEach}. - */ -@RunWith(JUnit4.class) -public class AfterEachTest { - - private SimpleTriggerTester<IntervalWindow> tester; - - @Before - public void initMocks() { - MockitoAnnotations.initMocks(this); - } - - /** - * Tests that the {@link AfterEach} trigger fires and finishes the first trigger then the second. - */ - @Test - public void testAfterEachInSequence() throws Exception { - tester = TriggerTester.forTrigger( - AfterEach.inOrder( - Repeatedly.forever(AfterPane.elementCountAtLeast(2)) - .orFinally(AfterPane.elementCountAtLeast(3)), - Repeatedly.forever(AfterPane.elementCountAtLeast(5)) - .orFinally(AfterWatermark.pastEndOfWindow())), - FixedWindows.of(Duration.millis(10))); - - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); - - // AfterCount(2) not ready - tester.injectElements(1); - assertFalse(tester.shouldFire(window)); - - // AfterCount(2) ready, not finished - tester.injectElements(2); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.isMarkedFinished(window)); - - // orFinally(AfterCount(3)) ready and will finish the first - tester.injectElements(1, 2, 3); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.isMarkedFinished(window)); - - // Now running as the second trigger - assertFalse(tester.shouldFire(window)); - // This quantity of elements would fire and finish if it were erroneously still the first - tester.injectElements(1, 2, 3, 4); - assertFalse(tester.shouldFire(window)); - - // Now fire once - tester.injectElements(5); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.isMarkedFinished(window)); - - // This time advance the watermark to finish the whole mess. - tester.advanceInputWatermark(new Instant(10)); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertTrue(tester.isMarkedFinished(window)); - } - - @Test - public void testFireDeadline() throws Exception { - BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10)); - - assertEquals(new Instant(9), - AfterEach.inOrder(AfterWatermark.pastEndOfWindow(), - AfterPane.elementCountAtLeast(4)) - .getWatermarkThatGuaranteesFiring(window)); - - assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, - AfterEach.inOrder(AfterPane.elementCountAtLeast(2), AfterWatermark.pastEndOfWindow()) - .getWatermarkThatGuaranteesFiring(window)); - } - - @Test - public void testContinuation() throws Exception { - OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane(); - OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow(); - Trigger afterEach = AfterEach.inOrder(trigger1, trigger2); - assertEquals( - Repeatedly.forever(AfterFirst.of( - trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger())), - afterEach.getContinuationTrigger()); - } - - @Test - public void testToString() { - Trigger trigger = AfterEach.inOrder( - StubTrigger.named("t1"), - StubTrigger.named("t2"), - StubTrigger.named("t3")); - - assertEquals("AfterEach.inOrder(t1, t2, t3)", trigger.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterFirstTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterFirstTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterFirstTest.java deleted file mode 100644 index 415060b..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterFirstTest.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.TriggerTester; -import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; - -/** - * Tests for {@link AfterFirst}. - */ -@RunWith(JUnit4.class) -public class AfterFirstTest { - - @Mock private OnceTrigger mockTrigger1; - @Mock private OnceTrigger mockTrigger2; - private SimpleTriggerTester<IntervalWindow> tester; - private static Trigger.TriggerContext anyTriggerContext() { - return Mockito.<Trigger.TriggerContext>any(); - } - - @Before - public void initMocks() { - MockitoAnnotations.initMocks(this); - } - - @Test - public void testNeitherShouldFireFixedWindows() throws Exception { - tester = TriggerTester.forTrigger( - AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10))); - - tester.injectElements(1); - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); - - when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false); - when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false); - - assertFalse(tester.shouldFire(window)); // should not fire - assertFalse(tester.isMarkedFinished(window)); // not finished - } - - @Test - public void testOnlyT1ShouldFireFixedWindows() throws Exception { - tester = TriggerTester.forTrigger( - AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10))); - tester.injectElements(1); - IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11)); - - when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true); - when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false); - - assertTrue(tester.shouldFire(window)); // should fire - - tester.fireIfShouldFire(window); - assertTrue(tester.isMarkedFinished(window)); - } - - @Test - public void testOnlyT2ShouldFireFixedWindows() throws Exception { - tester = TriggerTester.forTrigger( - AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10))); - tester.injectElements(1); - IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11)); - - when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false); - when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true); - assertTrue(tester.shouldFire(window)); // should fire - - tester.fireIfShouldFire(window); // now finished - assertTrue(tester.isMarkedFinished(window)); - } - - @Test - public void testBothShouldFireFixedWindows() throws Exception { - tester = TriggerTester.forTrigger( - AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10))); - tester.injectElements(1); - IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11)); - - when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true); - when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true); - assertTrue(tester.shouldFire(window)); // should fire - - tester.fireIfShouldFire(window); - assertTrue(tester.isMarkedFinished(window)); - } - - /** - * Tests that if the first trigger rewinds to be non-finished in the merged window, - * then it becomes the currently active trigger again, with real triggers. - */ - @Test - public void testShouldFireAfterMerge() throws Exception { - tester = TriggerTester.forTrigger( - AfterEach.inOrder( - AfterFirst.of(AfterPane.elementCountAtLeast(5), - AfterWatermark.pastEndOfWindow()), - Repeatedly.forever(AfterPane.elementCountAtLeast(1))), - Sessions.withGapDuration(Duration.millis(10))); - - // Finished the AfterFirst in the first window - tester.injectElements(1); - IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); - assertFalse(tester.shouldFire(firstWindow)); - tester.advanceInputWatermark(new Instant(11)); - assertTrue(tester.shouldFire(firstWindow)); - tester.fireIfShouldFire(firstWindow); - - // Set up second window where it is not done - tester.injectElements(5); - IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); - assertFalse(tester.shouldFire(secondWindow)); - - // Merge them, if the merged window were on the second trigger, it would be ready - tester.mergeWindows(); - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); - assertFalse(tester.shouldFire(mergedWindow)); - - // Now adding 3 more makes the AfterFirst ready to fire - tester.injectElements(1, 2, 3, 4, 5); - tester.mergeWindows(); - assertTrue(tester.shouldFire(mergedWindow)); - } - - @Test - public void testFireDeadline() throws Exception { - BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10)); - - assertEquals(new Instant(9), - AfterFirst.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(4)) - .getWatermarkThatGuaranteesFiring(window)); - assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, - AfterFirst.of(AfterPane.elementCountAtLeast(2), AfterPane.elementCountAtLeast(1)) - .getWatermarkThatGuaranteesFiring(window)); - } - - @Test - public void testContinuation() throws Exception { - OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane(); - OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow(); - Trigger afterFirst = AfterFirst.of(trigger1, trigger2); - assertEquals( - AfterFirst.of(trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger()), - afterFirst.getContinuationTrigger()); - } - - @Test - public void testToString() { - Trigger trigger = AfterFirst.of(StubTrigger.named("t1"), StubTrigger.named("t2")); - assertEquals("AfterFirst.of(t1, t2)", trigger.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java deleted file mode 100644 index 38d030e..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import org.apache.beam.sdk.util.TriggerTester; -import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link AfterPane}. - */ -@RunWith(JUnit4.class) -public class AfterPaneTest { - - SimpleTriggerTester<IntervalWindow> tester; - /** - * Tests that the trigger does fire when enough elements are in a window, and that it only - * fires that window (no leakage). - */ - @Test - public void testAfterPaneElementCountFixedWindows() throws Exception { - tester = TriggerTester.forTrigger( - AfterPane.elementCountAtLeast(2), - FixedWindows.of(Duration.millis(10))); - - tester.injectElements(1); // [0, 10) - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); - assertFalse(tester.shouldFire(window)); - - tester.injectElements(2); // [0, 10) - tester.injectElements(11); // [10, 20) - - assertTrue(tester.shouldFire(window)); // ready to fire - tester.fireIfShouldFire(window); // and finished - assertTrue(tester.isMarkedFinished(window)); - - // But don't finish the other window - assertFalse(tester.isMarkedFinished(new IntervalWindow(new Instant(10), new Instant(20)))); - } - - @Test - public void testClear() throws Exception { - SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger( - AfterPane.elementCountAtLeast(2), - FixedWindows.of(Duration.millis(10))); - - tester.injectElements(1, 2, 3); - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); - tester.clearState(window); - tester.assertCleared(window); - } - - @Test - public void testAfterPaneElementCountSessions() throws Exception { - tester = TriggerTester.forTrigger( - AfterPane.elementCountAtLeast(2), - Sessions.withGapDuration(Duration.millis(10))); - - tester.injectElements( - 1, // in [1, 11) - 2); // in [2, 12) - - assertFalse(tester.shouldFire(new IntervalWindow(new Instant(1), new Instant(11)))); - assertFalse(tester.shouldFire(new IntervalWindow(new Instant(2), new Instant(12)))); - - tester.mergeWindows(); - - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(12)); - assertTrue(tester.shouldFire(mergedWindow)); - tester.fireIfShouldFire(mergedWindow); - assertTrue(tester.isMarkedFinished(mergedWindow)); - - // Because we closed the previous window, we don't have it around to merge with. So there - // will be a new FIRE_AND_FINISH result. - tester.injectElements( - 7, // in [7, 17) - 9); // in [9, 19) - - tester.mergeWindows(); - - IntervalWindow newMergedWindow = new IntervalWindow(new Instant(7), new Instant(19)); - assertTrue(tester.shouldFire(newMergedWindow)); - tester.fireIfShouldFire(newMergedWindow); - assertTrue(tester.isMarkedFinished(newMergedWindow)); - } - - @Test - public void testFireDeadline() throws Exception { - assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, - AfterPane.elementCountAtLeast(1).getWatermarkThatGuaranteesFiring( - new IntervalWindow(new Instant(0), new Instant(10)))); - } - - @Test - public void testContinuation() throws Exception { - assertEquals( - AfterPane.elementCountAtLeast(1), - AfterPane.elementCountAtLeast(100).getContinuationTrigger()); - assertEquals( - AfterPane.elementCountAtLeast(1), - AfterPane.elementCountAtLeast(100).getContinuationTrigger().getContinuationTrigger()); - } - - @Test - public void testToString() { - Trigger trigger = AfterPane.elementCountAtLeast(5); - assertEquals("AfterPane.elementCountAtLeast(5)", trigger.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java deleted file mode 100644 index 13a7acf..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.TriggerTester; -import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests the {@link AfterProcessingTime}. - */ -@RunWith(JUnit4.class) -public class AfterProcessingTimeTest { - - /** - * Tests the basic property that the trigger does wait for processing time to be - * far enough advanced. - */ - @Test - public void testAfterProcessingTimeFixedWindows() throws Exception { - Duration windowDuration = Duration.millis(10); - SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger( - AfterProcessingTime - .pastFirstElementInPane() - .plusDelayOf(Duration.millis(5)), - FixedWindows.of(windowDuration)); - - tester.advanceProcessingTime(new Instant(10)); - - // Timer at 15 - tester.injectElements(1); - IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10)); - tester.advanceProcessingTime(new Instant(12)); - assertFalse(tester.shouldFire(firstWindow)); - - // Load up elements in the next window, timer at 17 for them - tester.injectElements(11, 12, 13); - IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20)); - assertFalse(tester.shouldFire(secondWindow)); - - // Not quite time to fire - tester.advanceProcessingTime(new Instant(14)); - assertFalse(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - - // Timer at 19 for these in the first window; it should be ignored since the 15 will fire first - tester.injectElements(2, 3); - - // Advance past the first timer and fire, finishing the first window - tester.advanceProcessingTime(new Instant(16)); - assertTrue(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - tester.fireIfShouldFire(firstWindow); - assertTrue(tester.isMarkedFinished(firstWindow)); - - // The next window fires and finishes now - tester.advanceProcessingTime(new Instant(18)); - assertTrue(tester.shouldFire(secondWindow)); - tester.fireIfShouldFire(secondWindow); - assertTrue(tester.isMarkedFinished(secondWindow)); - } - - /** - * Tests that when windows merge, if the trigger is waiting for "N millis after the first - * element" that it is relative to the earlier of the two merged windows. - */ - @Test - public void testClear() throws Exception { - SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger( - AfterProcessingTime - .pastFirstElementInPane() - .plusDelayOf(Duration.millis(5)), - FixedWindows.of(Duration.millis(10))); - - tester.injectElements(1, 2, 3); - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); - tester.clearState(window); - tester.assertCleared(window); - } - - @Test - public void testAfterProcessingTimeWithMergingWindow() throws Exception { - SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger( - AfterProcessingTime - .pastFirstElementInPane() - .plusDelayOf(Duration.millis(5)), - Sessions.withGapDuration(Duration.millis(10))); - - tester.advanceProcessingTime(new Instant(10)); - tester.injectElements(1); // in [1, 11), timer for 15 - IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); - assertFalse(tester.shouldFire(firstWindow)); - - tester.advanceProcessingTime(new Instant(12)); - tester.injectElements(3); // in [3, 13), timer for 17 - IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new Instant(13)); - assertFalse(tester.shouldFire(secondWindow)); - - tester.mergeWindows(); - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(13)); - - tester.advanceProcessingTime(new Instant(16)); - assertTrue(tester.shouldFire(mergedWindow)); - } - - @Test - public void testFireDeadline() throws Exception { - assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, - AfterProcessingTime.pastFirstElementInPane().getWatermarkThatGuaranteesFiring( - new IntervalWindow(new Instant(0), new Instant(10)))); - } - - @Test - public void testContinuation() throws Exception { - OnceTrigger firstElementPlus1 = - AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)); - assertEquals( - new AfterSynchronizedProcessingTime(), - firstElementPlus1.getContinuationTrigger()); - } - - /** - * Basic test of compatibility check between identical triggers. - */ - @Test - public void testCompatibilityIdentical() throws Exception { - Trigger t1 = AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(Duration.standardMinutes(1L)); - Trigger t2 = AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(Duration.standardMinutes(1L)); - assertTrue(t1.isCompatible(t2)); - } - - @Test - public void testToString() { - Trigger trigger = AfterProcessingTime.pastFirstElementInPane(); - assertEquals("AfterProcessingTime.pastFirstElementInPane()", trigger.toString()); - } - - @Test - public void testWithDelayToString() { - Trigger trigger = AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(Duration.standardMinutes(5)); - - assertEquals("AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5 minutes)", - trigger.toString()); - } - - @Test - public void testBuiltUpToString() { - Trigger trigger = AfterWatermark.pastEndOfWindow() - .withLateFirings(AfterProcessingTime - .pastFirstElementInPane() - .plusDelayOf(Duration.standardMinutes(10))); - - String expected = "AfterWatermark.pastEndOfWindow()" - + ".withLateFirings(AfterProcessingTime" - + ".pastFirstElementInPane()" - + ".plusDelayOf(10 minutes))"; - - assertEquals(expected, trigger.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java deleted file mode 100644 index 7e6e938..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import org.apache.beam.sdk.util.TriggerTester; -import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests the {@link AfterSynchronizedProcessingTime}. - */ -@RunWith(JUnit4.class) -public class AfterSynchronizedProcessingTimeTest { - - private Trigger underTest = new AfterSynchronizedProcessingTime(); - - @Test - public void testAfterProcessingTimeWithFixedWindows() throws Exception { - Duration windowDuration = Duration.millis(10); - SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger( - AfterProcessingTime - .pastFirstElementInPane() - .plusDelayOf(Duration.millis(5)), - FixedWindows.of(windowDuration)); - - tester.advanceProcessingTime(new Instant(10)); - - // Timer at 15 - tester.injectElements(1); - IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10)); - tester.advanceProcessingTime(new Instant(12)); - assertFalse(tester.shouldFire(firstWindow)); - - // Load up elements in the next window, timer at 17 for them - tester.injectElements(11, 12, 13); - IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20)); - assertFalse(tester.shouldFire(secondWindow)); - - // Not quite time to fire - tester.advanceProcessingTime(new Instant(14)); - assertFalse(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - - // Timer at 19 for these in the first window; it should be ignored since the 15 will fire first - tester.injectElements(2, 3); - - // Advance past the first timer and fire, finishing the first window - tester.advanceProcessingTime(new Instant(16)); - assertTrue(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - tester.fireIfShouldFire(firstWindow); - assertTrue(tester.isMarkedFinished(firstWindow)); - - // The next window fires and finishes now - tester.advanceProcessingTime(new Instant(18)); - assertTrue(tester.shouldFire(secondWindow)); - tester.fireIfShouldFire(secondWindow); - assertTrue(tester.isMarkedFinished(secondWindow)); - } - - @Test - public void testAfterProcessingTimeWithMergingWindow() throws Exception { - Duration windowDuration = Duration.millis(10); - SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger( - AfterProcessingTime - .pastFirstElementInPane() - .plusDelayOf(Duration.millis(5)), - Sessions.withGapDuration(windowDuration)); - - tester.advanceProcessingTime(new Instant(10)); - tester.injectElements(1); // in [1, 11), timer for 15 - IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); - assertFalse(tester.shouldFire(firstWindow)); - - tester.advanceProcessingTime(new Instant(12)); - tester.injectElements(3); // in [3, 13), timer for 17 - IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new Instant(13)); - assertFalse(tester.shouldFire(secondWindow)); - - tester.mergeWindows(); - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(13)); - - tester.advanceProcessingTime(new Instant(16)); - assertTrue(tester.shouldFire(mergedWindow)); - } - - @Test - public void testFireDeadline() throws Exception { - assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, - underTest.getWatermarkThatGuaranteesFiring( - new IntervalWindow(new Instant(0), new Instant(10)))); - } - - @Test - public void testContinuation() throws Exception { - assertEquals(underTest, underTest.getContinuationTrigger()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java deleted file mode 100644 index 084027b..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java +++ /dev/null @@ -1,380 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.TriggerTester; -import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; - -/** - * Tests the {@link AfterWatermark} triggers. - */ -@RunWith(JUnit4.class) -public class AfterWatermarkTest { - - @Mock private OnceTrigger mockEarly; - @Mock private OnceTrigger mockLate; - - private SimpleTriggerTester<IntervalWindow> tester; - private static Trigger.TriggerContext anyTriggerContext() { - return Mockito.<Trigger.TriggerContext>any(); - } - private static Trigger.OnElementContext anyElementContext() { - return Mockito.<Trigger.OnElementContext>any(); - } - - private void injectElements(int... elements) throws Exception { - for (int element : elements) { - doNothing().when(mockEarly).onElement(anyElementContext()); - doNothing().when(mockLate).onElement(anyElementContext()); - tester.injectElements(element); - } - } - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - } - - public void testRunningAsTrigger(OnceTrigger mockTrigger, IntervalWindow window) - throws Exception { - - // Don't fire due to mock saying no - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false); - assertFalse(tester.shouldFire(window)); // not ready - - // Fire due to mock trigger; early trigger is required to be a OnceTrigger - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); - assertTrue(tester.shouldFire(window)); // ready - tester.fireIfShouldFire(window); - assertFalse(tester.isMarkedFinished(window)); - } - - @Test - public void testEarlyAndAtWatermark() throws Exception { - tester = TriggerTester.forTrigger( - AfterWatermark.pastEndOfWindow() - .withEarlyFirings(mockEarly), - FixedWindows.of(Duration.millis(100))); - - injectElements(1); - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); - - testRunningAsTrigger(mockEarly, window); - - // Fire due to watermark - when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false); - tester.advanceInputWatermark(new Instant(100)); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertTrue(tester.isMarkedFinished(window)); - } - - @Test - public void testAtWatermarkAndLate() throws Exception { - tester = TriggerTester.forTrigger( - AfterWatermark.pastEndOfWindow() - .withLateFirings(mockLate), - FixedWindows.of(Duration.millis(100))); - - injectElements(1); - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); - - // No early firing, just double checking - when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(true); - assertFalse(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.isMarkedFinished(window)); - - // Fire due to watermark - when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false); - tester.advanceInputWatermark(new Instant(100)); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.isMarkedFinished(window)); - - testRunningAsTrigger(mockLate, window); - } - - @Test - public void testEarlyAndAtWatermarkAndLate() throws Exception { - tester = TriggerTester.forTrigger( - AfterWatermark.pastEndOfWindow() - .withEarlyFirings(mockEarly) - .withLateFirings(mockLate), - FixedWindows.of(Duration.millis(100))); - - injectElements(1); - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); - - testRunningAsTrigger(mockEarly, window); - - // Fire due to watermark - when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false); - tester.advanceInputWatermark(new Instant(100)); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.isMarkedFinished(window)); - - testRunningAsTrigger(mockLate, window); - } - - /** - * Tests that if the EOW is finished in both as well as the merged window, then - * it is finished in the merged result. - * - * <p>Because windows are discarded when a trigger finishes, we need to embed this - * in a sequence in order to check that it is re-activated. So this test is potentially - * sensitive to other triggers' correctness. - */ - @Test - public void testOnMergeAlreadyFinished() throws Exception { - tester = TriggerTester.forTrigger( - AfterEach.inOrder( - AfterWatermark.pastEndOfWindow(), - Repeatedly.forever(AfterPane.elementCountAtLeast(1))), - Sessions.withGapDuration(Duration.millis(10))); - - tester.injectElements(1); - tester.injectElements(5); - IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); - IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); - - // Finish the AfterWatermark.pastEndOfWindow() trigger in both windows - tester.advanceInputWatermark(new Instant(15)); - assertTrue(tester.shouldFire(firstWindow)); - assertTrue(tester.shouldFire(secondWindow)); - tester.fireIfShouldFire(firstWindow); - tester.fireIfShouldFire(secondWindow); - - // Confirm that we are on the second trigger by probing - assertFalse(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - tester.injectElements(1); - tester.injectElements(5); - assertTrue(tester.shouldFire(firstWindow)); - assertTrue(tester.shouldFire(secondWindow)); - tester.fireIfShouldFire(firstWindow); - tester.fireIfShouldFire(secondWindow); - - // Merging should leave it finished - tester.mergeWindows(); - - // Confirm that we are on the second trigger by probing - assertFalse(tester.shouldFire(mergedWindow)); - tester.injectElements(1); - assertTrue(tester.shouldFire(mergedWindow)); - } - - /** - * Tests that the trigger rewinds to be non-finished in the merged window. - * - * <p>Because windows are discarded when a trigger finishes, we need to embed this - * in a sequence in order to check that it is re-activated. So this test is potentially - * sensitive to other triggers' correctness. - */ - @Test - public void testOnMergeRewinds() throws Exception { - tester = TriggerTester.forTrigger( - AfterEach.inOrder( - AfterWatermark.pastEndOfWindow(), - Repeatedly.forever(AfterPane.elementCountAtLeast(1))), - Sessions.withGapDuration(Duration.millis(10))); - - tester.injectElements(1); - tester.injectElements(5); - IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); - IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); - - // Finish the AfterWatermark.pastEndOfWindow() trigger in only the first window - tester.advanceInputWatermark(new Instant(11)); - assertTrue(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - tester.fireIfShouldFire(firstWindow); - - // Confirm that we are on the second trigger by probing - assertFalse(tester.shouldFire(firstWindow)); - tester.injectElements(1); - assertTrue(tester.shouldFire(firstWindow)); - tester.fireIfShouldFire(firstWindow); - - // Merging should re-activate the watermark trigger in the merged window - tester.mergeWindows(); - - // Confirm that we are not on the second trigger by probing - assertFalse(tester.shouldFire(mergedWindow)); - tester.injectElements(1); - assertFalse(tester.shouldFire(mergedWindow)); - - // And confirm that advancing the watermark fires again - tester.advanceInputWatermark(new Instant(15)); - assertTrue(tester.shouldFire(mergedWindow)); - } - - /** - * Tests that if the EOW is finished in both as well as the merged window, then - * it is finished in the merged result. - * - * <p>Because windows are discarded when a trigger finishes, we need to embed this - * in a sequence in order to check that it is re-activated. So this test is potentially - * sensitive to other triggers' correctness. - */ - @Test - public void testEarlyAndLateOnMergeAlreadyFinished() throws Exception { - tester = TriggerTester.forTrigger( - AfterWatermark.pastEndOfWindow() - .withEarlyFirings(AfterPane.elementCountAtLeast(100)) - .withLateFirings(AfterPane.elementCountAtLeast(1)), - Sessions.withGapDuration(Duration.millis(10))); - - tester.injectElements(1); - tester.injectElements(5); - IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); - IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); - - // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in both windows - tester.advanceInputWatermark(new Instant(15)); - assertTrue(tester.shouldFire(firstWindow)); - assertTrue(tester.shouldFire(secondWindow)); - tester.fireIfShouldFire(firstWindow); - tester.fireIfShouldFire(secondWindow); - - // Confirm that we are on the late trigger by probing - assertFalse(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - tester.injectElements(1); - tester.injectElements(5); - assertTrue(tester.shouldFire(firstWindow)); - assertTrue(tester.shouldFire(secondWindow)); - tester.fireIfShouldFire(firstWindow); - tester.fireIfShouldFire(secondWindow); - - // Merging should leave it on the late trigger - tester.mergeWindows(); - - // Confirm that we are on the late trigger by probing - assertFalse(tester.shouldFire(mergedWindow)); - tester.injectElements(1); - assertTrue(tester.shouldFire(mergedWindow)); - } - - /** - * Tests that the trigger rewinds to be non-finished in the merged window. - * - * <p>Because windows are discarded when a trigger finishes, we need to embed this - * in a sequence in order to check that it is re-activated. So this test is potentially - * sensitive to other triggers' correctness. - */ - @Test - public void testEarlyAndLateOnMergeRewinds() throws Exception { - tester = TriggerTester.forTrigger( - AfterWatermark.pastEndOfWindow() - .withEarlyFirings(AfterPane.elementCountAtLeast(100)) - .withLateFirings(AfterPane.elementCountAtLeast(1)), - Sessions.withGapDuration(Duration.millis(10))); - - tester.injectElements(1); - tester.injectElements(5); - IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); - IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); - - // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in only the first window - tester.advanceInputWatermark(new Instant(11)); - assertTrue(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - tester.fireIfShouldFire(firstWindow); - - // Confirm that we are on the late trigger by probing - assertFalse(tester.shouldFire(firstWindow)); - tester.injectElements(1); - assertTrue(tester.shouldFire(firstWindow)); - tester.fireIfShouldFire(firstWindow); - - // Merging should re-activate the early trigger in the merged window - tester.mergeWindows(); - - // Confirm that we are not on the second trigger by probing - assertFalse(tester.shouldFire(mergedWindow)); - tester.injectElements(1); - assertFalse(tester.shouldFire(mergedWindow)); - - // And confirm that advancing the watermark fires again - tester.advanceInputWatermark(new Instant(15)); - assertTrue(tester.shouldFire(mergedWindow)); - } - - @Test - public void testFromEndOfWindowToString() { - Trigger trigger = AfterWatermark.pastEndOfWindow(); - assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString()); - } - - @Test - public void testEarlyFiringsToString() { - Trigger trigger = AfterWatermark.pastEndOfWindow().withEarlyFirings(StubTrigger.named("t1")); - - assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1)", trigger.toString()); - } - - @Test - public void testLateFiringsToString() { - Trigger trigger = AfterWatermark.pastEndOfWindow().withLateFirings(StubTrigger.named("t1")); - - assertEquals("AfterWatermark.pastEndOfWindow().withLateFirings(t1)", trigger.toString()); - } - - @Test - public void testEarlyAndLateFiringsToString() { - Trigger trigger = - AfterWatermark.pastEndOfWindow() - .withEarlyFirings(StubTrigger.named("t1")) - .withLateFirings(StubTrigger.named("t2")); - - assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1).withLateFirings(t2)", - trigger.toString()); - } - - @Test - public void testToStringExcludesNeverTrigger() { - Trigger trigger = - AfterWatermark.pastEndOfWindow() - .withEarlyFirings(Never.ever()) - .withLateFirings(Never.ever()); - - assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java deleted file mode 100644 index 673e555..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import org.apache.beam.sdk.util.TriggerTester; -import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests the {@link DefaultTrigger}, which should be equivalent to - * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}. - */ -@RunWith(JUnit4.class) -public class DefaultTriggerTest { - - SimpleTriggerTester<IntervalWindow> tester; - - @Test - public void testDefaultTriggerFixedWindows() throws Exception { - tester = TriggerTester.forTrigger( - DefaultTrigger.of(), - FixedWindows.of(Duration.millis(100))); - - tester.injectElements( - 1, // [0, 100) - 101); // [100, 200) - - IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(100)); - IntervalWindow secondWindow = new IntervalWindow(new Instant(100), new Instant(200)); - - // Advance the watermark almost to the end of the first window. - tester.advanceInputWatermark(new Instant(99)); - assertFalse(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - - // Advance watermark past end of the first window, which is then ready - tester.advanceInputWatermark(new Instant(100)); - assertTrue(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - - // Fire, but the first window is still allowed to fire - tester.fireIfShouldFire(firstWindow); - assertTrue(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - - // Advance watermark to 200, then both are ready - tester.advanceInputWatermark(new Instant(200)); - assertTrue(tester.shouldFire(firstWindow)); - assertTrue(tester.shouldFire(secondWindow)); - - assertFalse(tester.isMarkedFinished(firstWindow)); - assertFalse(tester.isMarkedFinished(secondWindow)); - } - - @Test - public void testDefaultTriggerSlidingWindows() throws Exception { - tester = TriggerTester.forTrigger( - DefaultTrigger.of(), - SlidingWindows.of(Duration.millis(100)).every(Duration.millis(50))); - - tester.injectElements( - 1, // [-50, 50), [0, 100) - 50); // [0, 100), [50, 150) - - IntervalWindow firstWindow = new IntervalWindow(new Instant(-50), new Instant(50)); - IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new Instant(100)); - IntervalWindow thirdWindow = new IntervalWindow(new Instant(50), new Instant(150)); - - assertFalse(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - assertFalse(tester.shouldFire(thirdWindow)); - - // At 50, the first becomes ready; it stays ready after firing - tester.advanceInputWatermark(new Instant(50)); - assertTrue(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - assertFalse(tester.shouldFire(thirdWindow)); - tester.fireIfShouldFire(firstWindow); - assertTrue(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - assertFalse(tester.shouldFire(thirdWindow)); - - // At 99, the first is still the only one ready - tester.advanceInputWatermark(new Instant(99)); - assertTrue(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - assertFalse(tester.shouldFire(thirdWindow)); - - // At 100, the first and second are ready - tester.advanceInputWatermark(new Instant(100)); - assertTrue(tester.shouldFire(firstWindow)); - assertTrue(tester.shouldFire(secondWindow)); - assertFalse(tester.shouldFire(thirdWindow)); - tester.fireIfShouldFire(firstWindow); - - assertFalse(tester.isMarkedFinished(firstWindow)); - assertFalse(tester.isMarkedFinished(secondWindow)); - assertFalse(tester.isMarkedFinished(thirdWindow)); - } - - @Test - public void testDefaultTriggerSessions() throws Exception { - tester = TriggerTester.forTrigger( - DefaultTrigger.of(), - Sessions.withGapDuration(Duration.millis(100))); - - tester.injectElements( - 1, // [1, 101) - 50); // [50, 150) - tester.mergeWindows(); - - IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(101)); - IntervalWindow secondWindow = new IntervalWindow(new Instant(50), new Instant(150)); - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(150)); - - // Not ready in any window yet - tester.advanceInputWatermark(new Instant(100)); - assertFalse(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - assertFalse(tester.shouldFire(mergedWindow)); - - // The first window is "ready": the caller owns knowledge of which windows are merged away - tester.advanceInputWatermark(new Instant(149)); - assertTrue(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - assertFalse(tester.shouldFire(mergedWindow)); - - // Now ready on all windows - tester.advanceInputWatermark(new Instant(150)); - assertTrue(tester.shouldFire(firstWindow)); - assertTrue(tester.shouldFire(secondWindow)); - assertTrue(tester.shouldFire(mergedWindow)); - - // Ensure it repeats - tester.fireIfShouldFire(mergedWindow); - assertTrue(tester.shouldFire(mergedWindow)); - - assertFalse(tester.isMarkedFinished(mergedWindow)); - } - - @Test - public void testFireDeadline() throws Exception { - assertEquals(new Instant(9), DefaultTrigger.of().getWatermarkThatGuaranteesFiring( - new IntervalWindow(new Instant(0), new Instant(10)))); - assertEquals(GlobalWindow.INSTANCE.maxTimestamp(), - DefaultTrigger.of().getWatermarkThatGuaranteesFiring(GlobalWindow.INSTANCE)); - } - - @Test - public void testContinuation() throws Exception { - assertEquals(DefaultTrigger.of(), DefaultTrigger.of().getContinuationTrigger()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java deleted file mode 100644 index 1e3a1ff..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; - -import java.util.Arrays; -import java.util.List; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link ExecutableTrigger}. - */ -@RunWith(JUnit4.class) -public class ExecutableTriggerTest { - - @Test - public void testIndexAssignmentLeaf() throws Exception { - StubTrigger t1 = new StubTrigger(); - ExecutableTrigger executable = ExecutableTrigger.create(t1); - assertEquals(0, executable.getTriggerIndex()); - } - - @Test - public void testIndexAssignmentOneLevel() throws Exception { - StubTrigger t1 = new StubTrigger(); - StubTrigger t2 = new StubTrigger(); - StubTrigger t = new StubTrigger(t1, t2); - - ExecutableTrigger executable = ExecutableTrigger.create(t); - - assertEquals(0, executable.getTriggerIndex()); - assertEquals(1, executable.subTriggers().get(0).getTriggerIndex()); - assertSame(t1, executable.subTriggers().get(0).getSpec()); - assertEquals(2, executable.subTriggers().get(1).getTriggerIndex()); - assertSame(t2, executable.subTriggers().get(1).getSpec()); - } - - @Test - public void testIndexAssignmentTwoLevel() throws Exception { - StubTrigger t11 = new StubTrigger(); - StubTrigger t12 = new StubTrigger(); - StubTrigger t13 = new StubTrigger(); - StubTrigger t14 = new StubTrigger(); - StubTrigger t21 = new StubTrigger(); - StubTrigger t22 = new StubTrigger(); - StubTrigger t1 = new StubTrigger(t11, t12, t13, t14); - StubTrigger t2 = new StubTrigger(t21, t22); - StubTrigger t = new StubTrigger(t1, t2); - - ExecutableTrigger executable = ExecutableTrigger.create(t); - - assertEquals(0, executable.getTriggerIndex()); - assertEquals(1, executable.subTriggers().get(0).getTriggerIndex()); - assertEquals(6, executable.subTriggers().get(0).getFirstIndexAfterSubtree()); - assertEquals(6, executable.subTriggers().get(1).getTriggerIndex()); - - assertSame(t1, executable.getSubTriggerContaining(1).getSpec()); - assertSame(t2, executable.getSubTriggerContaining(6).getSpec()); - assertSame(t1, executable.getSubTriggerContaining(2).getSpec()); - assertSame(t1, executable.getSubTriggerContaining(3).getSpec()); - assertSame(t1, executable.getSubTriggerContaining(5).getSpec()); - assertSame(t2, executable.getSubTriggerContaining(7).getSpec()); - } - - private static class StubTrigger extends Trigger { - - @SafeVarargs - protected StubTrigger(Trigger... subTriggers) { - super(Arrays.asList(subTriggers)); - } - - @Override - public void onElement(OnElementContext c) throws Exception { } - - @Override - public void onMerge(OnMergeContext c) throws Exception { } - - @Override - public void clear(TriggerContext c) throws Exception { - } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - return BoundedWindow.TIMESTAMP_MAX_VALUE; - } - - @Override - public boolean isCompatible(Trigger other) { - return false; - } - - @Override - public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { - return this; - } - - @Override - public boolean shouldFire(TriggerContext c) { - return false; - } - - @Override - public void onFire(TriggerContext c) { } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java deleted file mode 100644 index 7f74620..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.theInstance; -import static org.junit.Assert.assertThat; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link FinishedTriggersBitSet}. - */ -@RunWith(JUnit4.class) -public class FinishedTriggersBitSetTest { - /** - * Tests that after a trigger is set to finished, it reads back as finished. - */ - @Test - public void testSetGet() { - FinishedTriggersProperties.verifyGetAfterSet(FinishedTriggersBitSet.emptyWithCapacity(1)); - } - - /** - * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no - * others. - */ - @Test - public void testClearRecursively() { - FinishedTriggersProperties.verifyClearRecursively(FinishedTriggersBitSet.emptyWithCapacity(1)); - } - - @Test - public void testCopy() throws Exception { - FinishedTriggersBitSet finishedSet = FinishedTriggersBitSet.emptyWithCapacity(10); - assertThat(finishedSet.copy().getBitSet(), not(theInstance(finishedSet.getBitSet()))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java deleted file mode 100644 index a66f74f..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import org.apache.beam.sdk.transforms.windowing.AfterAll; -import org.apache.beam.sdk.transforms.windowing.AfterFirst; -import org.apache.beam.sdk.transforms.windowing.AfterPane; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; - -/** - * Generalized tests for {@link FinishedTriggers} implementations. - */ -public class FinishedTriggersProperties { - /** - * Tests that for the provided trigger and {@link FinishedTriggers}, when the trigger is set - * finished, it is correctly reported as finished. - */ - public static void verifyGetAfterSet(FinishedTriggers finishedSet, ExecutableTrigger trigger) { - assertFalse(finishedSet.isFinished(trigger)); - finishedSet.setFinished(trigger, true); - assertTrue(finishedSet.isFinished(trigger)); - } - - /** - * For a few arbitrary triggers, tests that when the trigger is set finished it is correctly - * reported as finished. - */ - public static void verifyGetAfterSet(FinishedTriggers finishedSet) { - ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of( - AfterFirst.of(AfterPane.elementCountAtLeast(3), AfterWatermark.pastEndOfWindow()), - AfterAll.of( - AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane()))); - - verifyGetAfterSet(finishedSet, trigger); - verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0).subTriggers().get(1)); - verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0)); - verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1)); - verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1).subTriggers().get(1)); - verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1).subTriggers().get(0)); - } - - /** - * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no - * others. - */ - public static void verifyClearRecursively(FinishedTriggers finishedSet) { - ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of( - AfterFirst.of(AfterPane.elementCountAtLeast(3), AfterWatermark.pastEndOfWindow()), - AfterAll.of( - AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane()))); - - // Set them all finished. This method is not on a trigger as it makes no sense outside tests. - setFinishedRecursively(finishedSet, trigger); - assertTrue(finishedSet.isFinished(trigger)); - assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0))); - assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(0))); - assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(1))); - - // Clear just the second AfterAll - finishedSet.clearRecursively(trigger.subTriggers().get(1)); - - // Check that the first and all that are still finished - assertTrue(finishedSet.isFinished(trigger)); - verifyFinishedRecursively(finishedSet, trigger.subTriggers().get(0)); - verifyUnfinishedRecursively(finishedSet, trigger.subTriggers().get(1)); - } - - private static void setFinishedRecursively( - FinishedTriggers finishedSet, ExecutableTrigger trigger) { - finishedSet.setFinished(trigger, true); - for (ExecutableTrigger subTrigger : trigger.subTriggers()) { - setFinishedRecursively(finishedSet, subTrigger); - } - } - - private static void verifyFinishedRecursively( - FinishedTriggers finishedSet, ExecutableTrigger trigger) { - assertTrue(finishedSet.isFinished(trigger)); - for (ExecutableTrigger subTrigger : trigger.subTriggers()) { - verifyFinishedRecursively(finishedSet, subTrigger); - } - } - - private static void verifyUnfinishedRecursively( - FinishedTriggers finishedSet, ExecutableTrigger trigger) { - assertFalse(finishedSet.isFinished(trigger)); - for (ExecutableTrigger subTrigger : trigger.subTriggers()) { - verifyUnfinishedRecursively(finishedSet, subTrigger); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java deleted file mode 100644 index 072d264..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.theInstance; -import static org.junit.Assert.assertThat; - -import java.util.HashSet; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link FinishedTriggersSet}. - */ -@RunWith(JUnit4.class) -public class FinishedTriggersSetTest { - /** - * Tests that after a trigger is set to finished, it reads back as finished. - */ - @Test - public void testSetGet() { - FinishedTriggersProperties.verifyGetAfterSet( - FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>())); - } - - /** - * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no - * others. - */ - @Test - public void testClearRecursively() { - FinishedTriggersProperties.verifyClearRecursively( - FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>())); - } - - @Test - public void testCopy() throws Exception { - FinishedTriggersSet finishedSet = - FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()); - assertThat(finishedSet.copy().getFinishedTriggers(), - not(theInstance(finishedSet.getFinishedTriggers()))); - } -}