Repository: samza
Updated Branches:
  refs/heads/master a980c9622 -> 00543804b


http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java 
b/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java
deleted file mode 100644
index f6b3ff8..0000000
--- a/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task;
-
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.MessageStreams.SystemMessageStream;
-import org.apache.samza.operators.data.IncomingSystemMessage;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.task.StreamOperatorTask;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-
-/**
- * Example implementation of unique key-based stream-stream join tasks
- *
- */
-public class JoinOperatorTask implements StreamOperatorTask {
-  class MessageType {
-    String joinKey;
-    List<String> joinFields = new ArrayList<>();
-  }
-
-  class JsonMessage extends InputJsonSystemMessage<MessageType> {
-
-    JsonMessage(String key, MessageType data, Offset offset, long timestamp, 
SystemStreamPartition partition) {
-      super(key, data, offset, timestamp, partition);
-    }
-  }
-
-  MessageStream<JsonMessage> joinOutput = null;
-
-  @Override public void initOperators(Collection<SystemMessageStream> sources) 
{
-    sources.forEach(source -> {
-        MessageStream<JsonMessage> newSource = 
source.map(this::getInputMessage);
-        if (joinOutput == null) {
-          joinOutput = newSource;
-        } else {
-          joinOutput = joinOutput.join(newSource, (m1, m2) -> 
this.myJoinResult(m1, m2));
-        }
-      });
-  }
-
-  private JsonMessage getInputMessage(IncomingSystemMessage ism) {
-    return new JsonMessage(
-        ((MessageType) ism.getMessage()).joinKey,
-        (MessageType) ism.getMessage(),
-        ism.getOffset(),
-        ism.getTimestamp(),
-        ism.getSystemStreamPartition());
-  }
-
-  JsonMessage myJoinResult(JsonMessage m1, JsonMessage m2) {
-    MessageType newJoinMsg = new MessageType();
-    newJoinMsg.joinKey = m1.getKey();
-    newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
-    newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
-    return new JsonMessage(m1.getMessage().joinKey, newJoinMsg, null, 
m1.getTimestamp(), null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java
 
b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java
deleted file mode 100644
index 47d6b3a..0000000
--- 
a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.task;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.impl.ChainedOperators;
-import org.apache.samza.operators.task.StreamOperatorTask;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.Partition;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.lang.reflect.Field;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.*;
-
-
-public class TestStreamOperatorAdaptorTask {
-  Field userTaskField = null;
-  Field chainedOpsField = null;
-
-  @Before public void prep() throws NoSuchFieldException {
-    userTaskField = 
StreamOperatorAdaptorTask.class.getDeclaredField("userTask");
-    chainedOpsField = 
StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
-    userTaskField.setAccessible(true);
-    chainedOpsField.setAccessible(true);
-  }
-
-
-  @Test public void testConstructor() throws IllegalAccessException {
-    StreamOperatorTask userTask = mock(StreamOperatorTask.class);
-    StreamOperatorAdaptorTask adaptorTask = new 
StreamOperatorAdaptorTask(userTask);
-    StreamOperatorTask taskMemberVar = (StreamOperatorTask) 
userTaskField.get(adaptorTask);
-    Map<SystemStreamPartition, ChainedOperators> chainsMap = 
(Map<SystemStreamPartition, ChainedOperators>) chainedOpsField.get(adaptorTask);
-    assertEquals(taskMemberVar, userTask);
-    assertTrue(chainsMap.isEmpty());
-  }
-
-  @Test public void testInit() throws Exception {
-    StreamOperatorTask userTask = mock(StreamOperatorTask.class);
-    StreamOperatorAdaptorTask adaptorTask = new 
StreamOperatorAdaptorTask(userTask);
-    Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
-    Set<SystemStreamPartition> testInputs = new HashSet() { {
-        this.add(new SystemStreamPartition("test-sys", "test-strm", new 
Partition(0)));
-        this.add(new SystemStreamPartition("test-sys", "test-strm", new 
Partition(1)));
-      } };
-    when(mockContext.getSystemStreamPartitions()).thenReturn(testInputs);
-    adaptorTask.init(mockConfig, mockContext);
-    verify(userTask, times(1)).initOperators(Mockito.anyCollection());
-    Map<SystemStreamPartition, ChainedOperators> chainsMap = 
(Map<SystemStreamPartition, ChainedOperators>) chainedOpsField.get(adaptorTask);
-    assertTrue(chainsMap.size() == 2);
-    assertTrue(chainsMap.containsKey(testInputs.toArray()[0]));
-    assertTrue(chainsMap.containsKey(testInputs.toArray()[1]));
-  }
-
-  // TODO: window and process methods to be added after implementation of 
ChainedOperators.create()
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
 
b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
deleted file mode 100644
index 44efa6d..0000000
--- 
a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.task;
-
-import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.impl.ChainedOperators;
-import org.apache.samza.system.SystemStreamPartition;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-/**
- * Unit test for {@link org.apache.samza.operators.task.StreamOperatorTask}
- */
-public class TestStreamOperatorTasks {
-
-  private final WindowOperatorTask userTask = new WindowOperatorTask();
-
-  private final BroadcastOperatorTask splitTask = new BroadcastOperatorTask();
-
-  private final JoinOperatorTask joinTask = new JoinOperatorTask();
-
-  private final Set<SystemStreamPartition> inputPartitions = new 
HashSet<SystemStreamPartition>() { {
-      for (int i = 0; i < 4; i++) {
-        this.add(new SystemStreamPartition("my-system", "my-topic1", new 
Partition(i)));
-      }
-    } };
-
-  @Test public void testUserTask() throws Exception {
-    Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
-    
when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
-    StreamOperatorAdaptorTask adaptorTask = new 
StreamOperatorAdaptorTask(this.userTask);
-    Field pipelineMapFld = 
StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
-    pipelineMapFld.setAccessible(true);
-    Map<SystemStreamPartition, ChainedOperators> pipelineMap =
-        (Map<SystemStreamPartition, ChainedOperators>) 
pipelineMapFld.get(adaptorTask);
-
-    adaptorTask.init(mockConfig, mockContext);
-    assertEquals(pipelineMap.size(), 4);
-    this.inputPartitions.forEach(partition -> {
-        assertNotNull(pipelineMap.get(partition));
-      });
-  }
-
-  @Test public void testSplitTask() throws Exception {
-    Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
-    
when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
-    StreamOperatorAdaptorTask adaptorTask = new 
StreamOperatorAdaptorTask(this.splitTask);
-    Field pipelineMapFld = 
StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
-    pipelineMapFld.setAccessible(true);
-    Map<SystemStreamPartition, ChainedOperators> pipelineMap =
-        (Map<SystemStreamPartition, ChainedOperators>) 
pipelineMapFld.get(adaptorTask);
-
-    adaptorTask.init(mockConfig, mockContext);
-    assertEquals(pipelineMap.size(), 4);
-    this.inputPartitions.forEach(partition -> {
-        assertNotNull(pipelineMap.get(partition));
-      });
-  }
-
-  @Test public void testJoinTask() throws Exception {
-    Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
-    
when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
-    StreamOperatorAdaptorTask adaptorTask = new 
StreamOperatorAdaptorTask(this.joinTask);
-    Field pipelineMapFld = 
StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
-    pipelineMapFld.setAccessible(true);
-    Map<SystemStreamPartition, ChainedOperators> pipelineMap =
-        (Map<SystemStreamPartition, ChainedOperators>) 
pipelineMapFld.get(adaptorTask);
-
-    adaptorTask.init(mockConfig, mockContext);
-    assertEquals(pipelineMap.size(), 4);
-    this.inputPartitions.forEach(partition -> {
-        assertNotNull(pipelineMap.get(partition));
-      });
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java 
b/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java
deleted file mode 100644
index de7bba5..0000000
--- a/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task;
-
-import org.apache.samza.operators.MessageStreams.SystemMessageStream;
-import org.apache.samza.operators.TriggerBuilder;
-import org.apache.samza.operators.Windows;
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.task.StreamOperatorTask;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.Collection;
-
-
-/**
- * Example implementation of a simple user-defined tasks w/ window operators
- *
- */
-public class WindowOperatorTask implements StreamOperatorTask {
-  class MessageType {
-    String field1;
-    String field2;
-  }
-
-  class JsonMessage extends InputJsonSystemMessage<MessageType> {
-
-    JsonMessage(String key, MessageType data, Offset offset, long timestamp, 
SystemStreamPartition partition) {
-      super(key, data, offset, timestamp, partition);
-    }
-  }
-
-  @Override public void initOperators(Collection<SystemMessageStream> sources) 
{
-    sources.forEach(source ->
-      source.map(m1 ->
-        new JsonMessage(
-          this.myMessageKeyFunction(m1),
-          (MessageType) m1.getMessage(),
-          m1.getOffset(),
-          m1.getTimestamp(),
-          m1.getSystemStreamPartition())).
-        window(
-          Windows.<JsonMessage, String>intoSessionCounter(
-              m -> String.format("%s-%s", m.getMessage().field1, 
m.getMessage().field2)).
-            setTriggers(TriggerBuilder.<JsonMessage, 
Integer>earlyTriggerWhenExceedWndLen(100).
-              addTimeoutSinceLastMessage(30000)))
-    );
-  }
-
-  String myMessageKeyFunction(Message<Object, Object> m) {
-    return m.getKey().toString();
-  }
-
-}

Reply via email to