Repository: samza Updated Branches: refs/heads/master a980c9622 -> 00543804b
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java deleted file mode 100644 index f6b3ff8..0000000 --- a/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.task; - -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.MessageStreams.SystemMessageStream; -import org.apache.samza.operators.data.IncomingSystemMessage; -import org.apache.samza.operators.data.Offset; -import org.apache.samza.operators.task.StreamOperatorTask; -import org.apache.samza.system.SystemStreamPartition; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - - -/** - * Example implementation of unique key-based stream-stream join tasks - * - */ -public class JoinOperatorTask implements StreamOperatorTask { - class MessageType { - String joinKey; - List<String> joinFields = new ArrayList<>(); - } - - class JsonMessage extends InputJsonSystemMessage<MessageType> { - - JsonMessage(String key, MessageType data, Offset offset, long timestamp, SystemStreamPartition partition) { - super(key, data, offset, timestamp, partition); - } - } - - MessageStream<JsonMessage> joinOutput = null; - - @Override public void initOperators(Collection<SystemMessageStream> sources) { - sources.forEach(source -> { - MessageStream<JsonMessage> newSource = source.map(this::getInputMessage); - if (joinOutput == null) { - joinOutput = newSource; - } else { - joinOutput = joinOutput.join(newSource, (m1, m2) -> this.myJoinResult(m1, m2)); - } - }); - } - - private JsonMessage getInputMessage(IncomingSystemMessage ism) { - return new JsonMessage( - ((MessageType) ism.getMessage()).joinKey, - (MessageType) ism.getMessage(), - ism.getOffset(), - ism.getTimestamp(), - ism.getSystemStreamPartition()); - } - - JsonMessage myJoinResult(JsonMessage m1, JsonMessage m2) { - MessageType newJoinMsg = new MessageType(); - newJoinMsg.joinKey = m1.getKey(); - newJoinMsg.joinFields.addAll(m1.getMessage().joinFields); - newJoinMsg.joinFields.addAll(m2.getMessage().joinFields); - return new JsonMessage(m1.getMessage().joinKey, newJoinMsg, null, m1.getTimestamp(), null); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java deleted file mode 100644 index 47d6b3a..0000000 --- a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.task; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.impl.ChainedOperators; -import org.apache.samza.operators.task.StreamOperatorTask; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.Partition; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -import java.lang.reflect.Field; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.*; - - -public class TestStreamOperatorAdaptorTask { - Field userTaskField = null; - Field chainedOpsField = null; - - @Before public void prep() throws NoSuchFieldException { - userTaskField = StreamOperatorAdaptorTask.class.getDeclaredField("userTask"); - chainedOpsField = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains"); - userTaskField.setAccessible(true); - chainedOpsField.setAccessible(true); - } - - - @Test public void testConstructor() throws IllegalAccessException { - StreamOperatorTask userTask = mock(StreamOperatorTask.class); - StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(userTask); - StreamOperatorTask taskMemberVar = (StreamOperatorTask) userTaskField.get(adaptorTask); - Map<SystemStreamPartition, ChainedOperators> chainsMap = (Map<SystemStreamPartition, ChainedOperators>) chainedOpsField.get(adaptorTask); - assertEquals(taskMemberVar, userTask); - assertTrue(chainsMap.isEmpty()); - } - - @Test public void testInit() throws Exception { - StreamOperatorTask userTask = mock(StreamOperatorTask.class); - StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(userTask); - Config mockConfig = mock(Config.class); - TaskContext mockContext = mock(TaskContext.class); - Set<SystemStreamPartition> testInputs = new HashSet() { { - this.add(new SystemStreamPartition("test-sys", "test-strm", new Partition(0))); - this.add(new SystemStreamPartition("test-sys", "test-strm", new Partition(1))); - } }; - when(mockContext.getSystemStreamPartitions()).thenReturn(testInputs); - adaptorTask.init(mockConfig, mockContext); - verify(userTask, times(1)).initOperators(Mockito.anyCollection()); - Map<SystemStreamPartition, ChainedOperators> chainsMap = (Map<SystemStreamPartition, ChainedOperators>) chainedOpsField.get(adaptorTask); - assertTrue(chainsMap.size() == 2); - assertTrue(chainsMap.containsKey(testInputs.toArray()[0])); - assertTrue(chainsMap.containsKey(testInputs.toArray()[1])); - } - - // TODO: window and process methods to be added after implementation of ChainedOperators.create() -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java deleted file mode 100644 index 44efa6d..0000000 --- a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.task; - -import org.apache.samza.Partition; -import org.apache.samza.config.Config; -import org.apache.samza.operators.impl.ChainedOperators; -import org.apache.samza.system.SystemStreamPartition; -import org.junit.Test; - -import java.lang.reflect.Field; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -/** - * Unit test for {@link org.apache.samza.operators.task.StreamOperatorTask} - */ -public class TestStreamOperatorTasks { - - private final WindowOperatorTask userTask = new WindowOperatorTask(); - - private final BroadcastOperatorTask splitTask = new BroadcastOperatorTask(); - - private final JoinOperatorTask joinTask = new JoinOperatorTask(); - - private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { { - for (int i = 0; i < 4; i++) { - this.add(new SystemStreamPartition("my-system", "my-topic1", new Partition(i))); - } - } }; - - @Test public void testUserTask() throws Exception { - Config mockConfig = mock(Config.class); - TaskContext mockContext = mock(TaskContext.class); - when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); - StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.userTask); - Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains"); - pipelineMapFld.setAccessible(true); - Map<SystemStreamPartition, ChainedOperators> pipelineMap = - (Map<SystemStreamPartition, ChainedOperators>) pipelineMapFld.get(adaptorTask); - - adaptorTask.init(mockConfig, mockContext); - assertEquals(pipelineMap.size(), 4); - this.inputPartitions.forEach(partition -> { - assertNotNull(pipelineMap.get(partition)); - }); - } - - @Test public void testSplitTask() throws Exception { - Config mockConfig = mock(Config.class); - TaskContext mockContext = mock(TaskContext.class); - when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); - StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.splitTask); - Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains"); - pipelineMapFld.setAccessible(true); - Map<SystemStreamPartition, ChainedOperators> pipelineMap = - (Map<SystemStreamPartition, ChainedOperators>) pipelineMapFld.get(adaptorTask); - - adaptorTask.init(mockConfig, mockContext); - assertEquals(pipelineMap.size(), 4); - this.inputPartitions.forEach(partition -> { - assertNotNull(pipelineMap.get(partition)); - }); - } - - @Test public void testJoinTask() throws Exception { - Config mockConfig = mock(Config.class); - TaskContext mockContext = mock(TaskContext.class); - when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); - StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.joinTask); - Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains"); - pipelineMapFld.setAccessible(true); - Map<SystemStreamPartition, ChainedOperators> pipelineMap = - (Map<SystemStreamPartition, ChainedOperators>) pipelineMapFld.get(adaptorTask); - - adaptorTask.init(mockConfig, mockContext); - assertEquals(pipelineMap.size(), 4); - this.inputPartitions.forEach(partition -> { - assertNotNull(pipelineMap.get(partition)); - }); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java deleted file mode 100644 index de7bba5..0000000 --- a/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.task; - -import org.apache.samza.operators.MessageStreams.SystemMessageStream; -import org.apache.samza.operators.TriggerBuilder; -import org.apache.samza.operators.Windows; -import org.apache.samza.operators.data.Message; -import org.apache.samza.operators.data.Offset; -import org.apache.samza.operators.task.StreamOperatorTask; -import org.apache.samza.system.SystemStreamPartition; - -import java.util.Collection; - - -/** - * Example implementation of a simple user-defined tasks w/ window operators - * - */ -public class WindowOperatorTask implements StreamOperatorTask { - class MessageType { - String field1; - String field2; - } - - class JsonMessage extends InputJsonSystemMessage<MessageType> { - - JsonMessage(String key, MessageType data, Offset offset, long timestamp, SystemStreamPartition partition) { - super(key, data, offset, timestamp, partition); - } - } - - @Override public void initOperators(Collection<SystemMessageStream> sources) { - sources.forEach(source -> - source.map(m1 -> - new JsonMessage( - this.myMessageKeyFunction(m1), - (MessageType) m1.getMessage(), - m1.getOffset(), - m1.getTimestamp(), - m1.getSystemStreamPartition())). - window( - Windows.<JsonMessage, String>intoSessionCounter( - m -> String.format("%s-%s", m.getMessage().field1, m.getMessage().field2)). - setTriggers(TriggerBuilder.<JsonMessage, Integer>earlyTriggerWhenExceedWndLen(100). - addTimeoutSinceLastMessage(30000))) - ); - } - - String myMessageKeyFunction(Message<Object, Object> m) { - return m.getKey().toString(); - } - -}