Repository: incubator-beam Updated Branches: refs/heads/master 71c69b31b -> 6d686288e
Restore trigger-related tests missed in #1083 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8d43e8aa Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8d43e8aa Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8d43e8aa Branch: refs/heads/master Commit: 8d43e8aa7ccb154e17d6840c25c7a72684c615aa Parents: 71c69b3 Author: Kenneth Knowles <k...@google.com> Authored: Tue Oct 18 10:11:37 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Tue Oct 18 11:00:47 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/util/ExecutableTriggerTest.java | 127 +++++++++++++++++++ .../sdk/util/FinishedTriggersBitSetTest.java | 55 ++++++++ .../sdk/util/FinishedTriggersProperties.java | 110 ++++++++++++++++ .../beam/sdk/util/FinishedTriggersSetTest.java | 60 +++++++++ 4 files changed, 352 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8d43e8aa/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java new file mode 100644 index 0000000..1e3a1ff --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link ExecutableTrigger}. + */ +@RunWith(JUnit4.class) +public class ExecutableTriggerTest { + + @Test + public void testIndexAssignmentLeaf() throws Exception { + StubTrigger t1 = new StubTrigger(); + ExecutableTrigger executable = ExecutableTrigger.create(t1); + assertEquals(0, executable.getTriggerIndex()); + } + + @Test + public void testIndexAssignmentOneLevel() throws Exception { + StubTrigger t1 = new StubTrigger(); + StubTrigger t2 = new StubTrigger(); + StubTrigger t = new StubTrigger(t1, t2); + + ExecutableTrigger executable = ExecutableTrigger.create(t); + + assertEquals(0, executable.getTriggerIndex()); + assertEquals(1, executable.subTriggers().get(0).getTriggerIndex()); + assertSame(t1, executable.subTriggers().get(0).getSpec()); + assertEquals(2, executable.subTriggers().get(1).getTriggerIndex()); + assertSame(t2, executable.subTriggers().get(1).getSpec()); + } + + @Test + public void testIndexAssignmentTwoLevel() throws Exception { + StubTrigger t11 = new StubTrigger(); + StubTrigger t12 = new StubTrigger(); + StubTrigger t13 = new StubTrigger(); + StubTrigger t14 = new StubTrigger(); + StubTrigger t21 = new StubTrigger(); + StubTrigger t22 = new StubTrigger(); + StubTrigger t1 = new StubTrigger(t11, t12, t13, t14); + StubTrigger t2 = new StubTrigger(t21, t22); + StubTrigger t = new StubTrigger(t1, t2); + + ExecutableTrigger executable = ExecutableTrigger.create(t); + + assertEquals(0, executable.getTriggerIndex()); + assertEquals(1, executable.subTriggers().get(0).getTriggerIndex()); + assertEquals(6, executable.subTriggers().get(0).getFirstIndexAfterSubtree()); + assertEquals(6, executable.subTriggers().get(1).getTriggerIndex()); + + assertSame(t1, executable.getSubTriggerContaining(1).getSpec()); + assertSame(t2, executable.getSubTriggerContaining(6).getSpec()); + assertSame(t1, executable.getSubTriggerContaining(2).getSpec()); + assertSame(t1, executable.getSubTriggerContaining(3).getSpec()); + assertSame(t1, executable.getSubTriggerContaining(5).getSpec()); + assertSame(t2, executable.getSubTriggerContaining(7).getSpec()); + } + + private static class StubTrigger extends Trigger { + + @SafeVarargs + protected StubTrigger(Trigger... subTriggers) { + super(Arrays.asList(subTriggers)); + } + + @Override + public void onElement(OnElementContext c) throws Exception { } + + @Override + public void onMerge(OnMergeContext c) throws Exception { } + + @Override + public void clear(TriggerContext c) throws Exception { + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + return BoundedWindow.TIMESTAMP_MAX_VALUE; + } + + @Override + public boolean isCompatible(Trigger other) { + return false; + } + + @Override + public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { + return this; + } + + @Override + public boolean shouldFire(TriggerContext c) { + return false; + } + + @Override + public void onFire(TriggerContext c) { } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8d43e8aa/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java new file mode 100644 index 0000000..7f74620 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.theInstance; +import static org.junit.Assert.assertThat; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link FinishedTriggersBitSet}. + */ +@RunWith(JUnit4.class) +public class FinishedTriggersBitSetTest { + /** + * Tests that after a trigger is set to finished, it reads back as finished. + */ + @Test + public void testSetGet() { + FinishedTriggersProperties.verifyGetAfterSet(FinishedTriggersBitSet.emptyWithCapacity(1)); + } + + /** + * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no + * others. + */ + @Test + public void testClearRecursively() { + FinishedTriggersProperties.verifyClearRecursively(FinishedTriggersBitSet.emptyWithCapacity(1)); + } + + @Test + public void testCopy() throws Exception { + FinishedTriggersBitSet finishedSet = FinishedTriggersBitSet.emptyWithCapacity(10); + assertThat(finishedSet.copy().getBitSet(), not(theInstance(finishedSet.getBitSet()))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8d43e8aa/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java new file mode 100644 index 0000000..a66f74f --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.transforms.windowing.AfterAll; +import org.apache.beam.sdk.transforms.windowing.AfterFirst; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; + +/** + * Generalized tests for {@link FinishedTriggers} implementations. + */ +public class FinishedTriggersProperties { + /** + * Tests that for the provided trigger and {@link FinishedTriggers}, when the trigger is set + * finished, it is correctly reported as finished. + */ + public static void verifyGetAfterSet(FinishedTriggers finishedSet, ExecutableTrigger trigger) { + assertFalse(finishedSet.isFinished(trigger)); + finishedSet.setFinished(trigger, true); + assertTrue(finishedSet.isFinished(trigger)); + } + + /** + * For a few arbitrary triggers, tests that when the trigger is set finished it is correctly + * reported as finished. + */ + public static void verifyGetAfterSet(FinishedTriggers finishedSet) { + ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of( + AfterFirst.of(AfterPane.elementCountAtLeast(3), AfterWatermark.pastEndOfWindow()), + AfterAll.of( + AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane()))); + + verifyGetAfterSet(finishedSet, trigger); + verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0).subTriggers().get(1)); + verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0)); + verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1)); + verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1).subTriggers().get(1)); + verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1).subTriggers().get(0)); + } + + /** + * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no + * others. + */ + public static void verifyClearRecursively(FinishedTriggers finishedSet) { + ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of( + AfterFirst.of(AfterPane.elementCountAtLeast(3), AfterWatermark.pastEndOfWindow()), + AfterAll.of( + AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane()))); + + // Set them all finished. This method is not on a trigger as it makes no sense outside tests. + setFinishedRecursively(finishedSet, trigger); + assertTrue(finishedSet.isFinished(trigger)); + assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0))); + assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(0))); + assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(1))); + + // Clear just the second AfterAll + finishedSet.clearRecursively(trigger.subTriggers().get(1)); + + // Check that the first and all that are still finished + assertTrue(finishedSet.isFinished(trigger)); + verifyFinishedRecursively(finishedSet, trigger.subTriggers().get(0)); + verifyUnfinishedRecursively(finishedSet, trigger.subTriggers().get(1)); + } + + private static void setFinishedRecursively( + FinishedTriggers finishedSet, ExecutableTrigger trigger) { + finishedSet.setFinished(trigger, true); + for (ExecutableTrigger subTrigger : trigger.subTriggers()) { + setFinishedRecursively(finishedSet, subTrigger); + } + } + + private static void verifyFinishedRecursively( + FinishedTriggers finishedSet, ExecutableTrigger trigger) { + assertTrue(finishedSet.isFinished(trigger)); + for (ExecutableTrigger subTrigger : trigger.subTriggers()) { + verifyFinishedRecursively(finishedSet, subTrigger); + } + } + + private static void verifyUnfinishedRecursively( + FinishedTriggers finishedSet, ExecutableTrigger trigger) { + assertFalse(finishedSet.isFinished(trigger)); + for (ExecutableTrigger subTrigger : trigger.subTriggers()) { + verifyUnfinishedRecursively(finishedSet, subTrigger); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8d43e8aa/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java new file mode 100644 index 0000000..072d264 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.theInstance; +import static org.junit.Assert.assertThat; + +import java.util.HashSet; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link FinishedTriggersSet}. + */ +@RunWith(JUnit4.class) +public class FinishedTriggersSetTest { + /** + * Tests that after a trigger is set to finished, it reads back as finished. + */ + @Test + public void testSetGet() { + FinishedTriggersProperties.verifyGetAfterSet( + FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>())); + } + + /** + * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no + * others. + */ + @Test + public void testClearRecursively() { + FinishedTriggersProperties.verifyClearRecursively( + FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>())); + } + + @Test + public void testCopy() throws Exception { + FinishedTriggersSet finishedSet = + FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()); + assertThat(finishedSet.copy().getFinishedTriggers(), + not(theInstance(finishedSet.getFinishedTriggers()))); + } +}