Repository: samza Updated Branches: refs/heads/master 65af13df1 -> 4bf8ab6eb
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java deleted file mode 100644 index 0d720dd..0000000 --- a/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java +++ /dev/null @@ -1,389 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators.triggers; - - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import junit.framework.Assert; -import org.apache.samza.Partition; -import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.data.MessageEnvelope; -import org.apache.samza.operators.windows.AccumulationMode; -import org.apache.samza.operators.windows.WindowPane; -import org.apache.samza.operators.windows.Windows; -import org.apache.samza.runtime.ApplicationRunner; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.StreamSpec; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.StreamOperatorTask; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; -import org.junit.Before; -import org.junit.Test; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.function.Function; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class TestWindowOperator { - private final MessageCollector messageCollector = mock(MessageCollector.class); - private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class); - private final List<WindowPane<Integer, Collection<MessageEnvelope<Integer, Integer>>>> windowPanes = new ArrayList<>(); - private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3); - private Config config; - private TaskContext taskContext; - private ApplicationRunner runner; - - @Before - public void setup() throws Exception { - windowPanes.clear(); - - config = mock(Config.class); - taskContext = mock(TaskContext.class); - runner = mock(ApplicationRunner.class); - when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet - .of(new SystemStreamPartition("kafka", "integers", new Partition(0)))); - - } - - @Test - public void testTumblingWindowsDiscardingMode() throws Exception { - - StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))); - TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner); - task.init(config, taskContext); - - integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator)); - testClock.advanceTime(Duration.ofSeconds(1)); - - task.window(messageCollector, taskCoordinator); - Assert.assertEquals(windowPanes.size(), 5); - Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); - Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2); - - Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2)); - Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 2); - - Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1)); - Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(2).getMessage()).size(), 2); - - Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2)); - Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(3).getMessage()).size(), 2); - - Assert.assertEquals(windowPanes.get(4).getKey().getKey(), new Integer(3)); - Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(4).getMessage()).size(), 1); - } - - @Test - public void testTumblingWindowsAccumulatingMode() throws Exception { - StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))); - TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner); - task.init(config, taskContext); - - integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator)); - testClock.advanceTime(Duration.ofSeconds(1)); - task.window(messageCollector, taskCoordinator); - - Assert.assertEquals(windowPanes.size(), 7); - Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); - Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2); - - Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2)); - Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 2); - - Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1)); - Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(2).getMessage()).size(), 4); - - Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2)); - Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(3).getMessage()).size(), 4); - } - - @Test - public void testSessionWindowsDiscardingMode() throws Exception { - StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500)); - TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner); - task.init(config, taskContext); - - task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); - testClock.advanceTime(Duration.ofSeconds(1)); - task.window(messageCollector, taskCoordinator); - - Assert.assertEquals(windowPanes.size(), 1); - Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1"); - Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); - - task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator); - - testClock.advanceTime(Duration.ofSeconds(1)); - task.window(messageCollector, taskCoordinator); - - Assert.assertEquals(windowPanes.size(), 3); - Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1"); - Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "1001"); - Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1001"); - Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2); - Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 2); - Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(2).getMessage()).size(), 2); - - task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); - - testClock.advanceTime(Duration.ofSeconds(1)); - task.window(messageCollector, taskCoordinator); - Assert.assertEquals(windowPanes.size(), 4); - Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2)); - Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "2001"); - Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(3).getMessage()).size(), 2); - - } - - @Test - public void testSessionWindowsAccumulatingMode() throws Exception { - StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500)); - TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner); - task.init(config, taskContext); - - task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); - testClock.advanceTime(Duration.ofSeconds(1)); - - task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); - - task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); - - testClock.advanceTime(Duration.ofSeconds(1)); - task.window(messageCollector, taskCoordinator); - Assert.assertEquals(windowPanes.size(), 2); - Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2); - Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); - Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2)); - Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2); - Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 4); - } - - @Test - public void testCancelationOfOnceTrigger() throws Exception { - StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), Triggers.count(2)); - TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner); - task.init(config, taskContext); - - task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator); - Assert.assertEquals(windowPanes.size(), 1); - Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0"); - Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); - Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY); - - task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator); - - Assert.assertEquals(windowPanes.size(), 1); - - testClock.advanceTime(Duration.ofSeconds(1)); - task.window(messageCollector, taskCoordinator); - - Assert.assertEquals(windowPanes.size(), 2); - Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0"); - Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0"); - Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT); - - task.process(new IntegerMessageEnvelope(3, 6), messageCollector, taskCoordinator); - testClock.advanceTime(Duration.ofSeconds(1)); - task.window(messageCollector, taskCoordinator); - - Assert.assertEquals(windowPanes.size(), 3); - Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(3)); - Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000"); - Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.DEFAULT); - Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(2).getMessage()).size(), 1); - - } - - @Test - public void testCancelationOfAnyTrigger() throws Exception { - StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), - Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))); - TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner); - task.init(config, taskContext); - - task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator); - //assert that the count trigger fired - Assert.assertEquals(windowPanes.size(), 1); - - //advance the timer to enable the triggering of the inner timeSinceFirstMessage trigger - testClock.advanceTime(Duration.ofMillis(500)); - - //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger - Assert.assertEquals(windowPanes.size(), 1); - - task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator); - - //advance timer by 500 more millis to enable the default trigger - testClock.advanceTime(Duration.ofMillis(500)); - task.window(messageCollector, taskCoordinator); - - //assert that the default trigger fired - Assert.assertEquals(windowPanes.size(), 2); - Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT); - Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(1)); - Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0"); - Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 5); - - task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator); - - //advance timer by 500 millis to enable the inner timeSinceFirstMessage trigger - testClock.advanceTime(Duration.ofMillis(500)); - task.window(messageCollector, taskCoordinator); - - Assert.assertEquals(windowPanes.size(), 3); - Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.EARLY); - Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1)); - Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000"); - - //advance timer by > 500 millis to enable the default trigger - testClock.advanceTime(Duration.ofMillis(900)); - task.window(messageCollector, taskCoordinator); - Assert.assertEquals(windowPanes.size(), 4); - Assert.assertEquals(windowPanes.get(3).getFiringType(), FiringType.DEFAULT); - Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(1)); - Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "1000"); - } - - @Test - public void testCancelationOfRepeatingNestedTriggers() throws Exception { - - StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), - Triggers.repeat(Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500))))); - TestClock testClock = new TestClock(); - StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner); - task.init(config, taskContext); - - task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); - - task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator); - //assert that the count trigger fired - Assert.assertEquals(windowPanes.size(), 1); - - //advance the timer to enable the potential triggering of the inner timeSinceFirstMessage trigger - task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator); - testClock.advanceTime(Duration.ofMillis(500)); - //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger - task.window(messageCollector, taskCoordinator); - Assert.assertEquals(windowPanes.size(), 2); - - task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator); - task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator); - Assert.assertEquals(windowPanes.size(), 3); - - task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator); - //advance timer by 500 more millis to enable the default trigger - testClock.advanceTime(Duration.ofMillis(500)); - task.window(messageCollector, taskCoordinator); - //assert that the default trigger fired - Assert.assertEquals(windowPanes.size(), 4); - } - - private class KeyedTumblingWindowStreamApplication implements StreamApplication { - - private final StreamSpec streamSpec = new StreamSpec("integer-stream", "integers", "kafka"); - private final AccumulationMode mode; - private final Duration duration; - private final Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger; - - KeyedTumblingWindowStreamApplication(AccumulationMode mode, Duration timeDuration, Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger) { - this.mode = mode; - this.duration = timeDuration; - this.earlyTrigger = earlyTrigger; - } - - @Override - public void init(StreamGraph graph, Config config) { - MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.createInStream(streamSpec, null, null); - Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> m.getKey(); - inStream - .map(m -> m) - .window(Windows.keyedTumblingWindow(keyFn, duration).setEarlyTrigger(earlyTrigger) - .setAccumulationMode(mode)) - .map(m -> { - windowPanes.add(m); - return m; - }); - } - } - - private class KeyedSessionWindowStreamApplication implements StreamApplication { - - private final StreamSpec streamSpec = new StreamSpec("integer-stream", "integers", "kafka"); - private final AccumulationMode mode; - private final Duration duration; - - KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration duration) { - this.mode = mode; - this.duration = duration; - } - - @Override - public void init(StreamGraph graph, Config config) { - MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.createInStream(streamSpec, null, null); - Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> m.getKey(); - - inStream - .map(m -> m) - .window(Windows.keyedSessionWindow(keyFn, duration) - .setAccumulationMode(mode)) - .map(m -> { - windowPanes.add(m); - return m; - }); - } - } - - private class IntegerMessageEnvelope extends IncomingMessageEnvelope { - IntegerMessageEnvelope(int key, int msg) { - super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, msg); - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/testUtils/TestClock.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/TestClock.java b/samza-core/src/test/java/org/apache/samza/testUtils/TestClock.java new file mode 100644 index 0000000..710ebda --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/testUtils/TestClock.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.testUtils; + +import org.apache.samza.util.Clock; + +import java.time.Duration; + +/** + * An implementation of {@link Clock} that allows to advance the time by an arbitrary duration. + * Used for testing. + */ +public class TestClock implements Clock { + + long currentTime = 1; + + public void advanceTime(Duration duration) { + currentTime += duration.toMillis(); + } + + public void advanceTime(long millis) { + currentTime += millis; + } + + @Override + public long currentTimeMillis() { + return currentTime; + } +}