Repository: samza
Updated Branches:
  refs/heads/master 0feb5c2da -> 49cf06fc5


SAMZA-1202; Multiple calls to getInputStream() result in non-deterministic 
behavior

Author: vjagadish1989 <jvenk...@linkedin.com>

Reviewers: Prateek Maheshwari<prate...@linkedin.com>

Closes #136 from vjagadish1989/samza-1202


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/49cf06fc
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/49cf06fc
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/49cf06fc

Branch: refs/heads/master
Commit: 49cf06fc5a7db4ad1718bfc2d110db6f48d4f180
Parents: 0feb5c2
Author: vjagadish1989 <jvenk...@linkedin.com>
Authored: Mon Apr 24 17:15:47 2017 -0700
Committer: vjagadish1989 <jvenk...@linkedin.com>
Committed: Mon Apr 24 17:15:47 2017 -0700

----------------------------------------------------------------------
 .../org/apache/samza/operators/StreamGraph.java |  8 +++--
 .../org/apache/samza/system/StreamSpec.java     | 15 ++++++++++
 .../apache/samza/operators/StreamGraphImpl.java | 11 +++++++
 .../apache/samza/task/StreamOperatorTask.java   |  7 +++--
 .../samza/operators/TestStreamGraphImpl.java    | 31 ++++++++++++++++++++
 5 files changed, 68 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/49cf06fc/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java 
b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
index a03f7c3..d299068 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
@@ -30,7 +30,8 @@ import java.util.function.Function;
 public interface StreamGraph {
 
   /**
-   * Gets the input {@link MessageStream} corresponding to the logical {@code 
streamId}.
+   * Gets the input {@link MessageStream} corresponding to the logical {@code 
streamId}. Multiple invocations of
+   * this method with the same {@code streamId} will throw an {@link 
IllegalStateException}
    *
    * @param streamId the unique logical ID for the stream
    * @param msgBuilder the {@link BiFunction} to convert the incoming key and 
message to a message
@@ -39,11 +40,13 @@ public interface StreamGraph {
    * @param <V> the type of message in the incoming message
    * @param <M> the type of message in the input {@link MessageStream}
    * @return the input {@link MessageStream}
+   * @throws IllegalStateException when invoked multiple times with the same 
{@code streamId}
    */
   <K, V, M> MessageStream<M> getInputStream(String streamId, BiFunction<? 
super K, ? super V, ? extends M> msgBuilder);
 
   /**
-   * Gets the {@link OutputStream} corresponding to the logical {@code 
streamId}.
+   * Gets the {@link OutputStream} corresponding to the logical {@code 
streamId}. Multiple invocations of
+   * this method with the same {@code streamId} will throw an {@link 
IllegalStateException}
    *
    * @param streamId the unique logical ID for the stream
    * @param keyExtractor the {@link Function} to extract the outgoing key from 
the output message
@@ -52,6 +55,7 @@ public interface StreamGraph {
    * @param <V> the type of message in the outgoing message
    * @param <M> the type of message in the {@link OutputStream}
    * @return the output {@link MessageStream}
+   * @throws IllegalStateException when invoked multiple times with the same 
{@code streamId}
    */
   <K, V, M> OutputStream<K, V, M> getOutputStream(String streamId,
       Function<? super M, ? extends K> keyExtractor, Function<? super M, ? 
extends V> msgExtractor);

http://git-wip-us.apache.org/repos/asf/samza/blob/49cf06fc/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java 
b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
index 5711a8b..237bedd 100644
--- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
@@ -201,4 +201,19 @@ public class StreamSpec {
       throw new IllegalArgumentException(String.format("Identifier '%s' is 
'%s'. It must match the expression [A-Za-z0-9_-]+", identifierName, 
identifierValue));
     }
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || !getClass().equals(o.getClass())) return false;
+
+    StreamSpec that = (StreamSpec) o;
+
+    return id.equals(that.id);
+  }
+
+  @Override
+  public int hashCode() {
+    return id.hashCode();
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/49cf06fc/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java 
b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index 86ce6a4..5ba390a 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -65,6 +65,11 @@ public class StreamGraphImpl implements StreamGraph {
     if (msgBuilder == null) {
       throw new IllegalArgumentException("msgBuilder can't be null for an 
input stream");
     }
+
+    if (inStreams.containsKey(runner.getStreamSpec(streamId))) {
+      throw new IllegalStateException("Cannot invoke getInputStream() multiple 
times with the same streamId: " + streamId);
+    }
+
     return inStreams.computeIfAbsent(runner.getStreamSpec(streamId),
         streamSpec -> new InputStreamInternalImpl<>(this, streamSpec, 
(BiFunction<K, V, M>) msgBuilder));
   }
@@ -75,9 +80,15 @@ public class StreamGraphImpl implements StreamGraph {
     if (keyExtractor == null) {
       throw new IllegalArgumentException("keyExtractor can't be null for an 
output stream.");
     }
+
     if (msgExtractor == null) {
       throw new IllegalArgumentException("msgExtractor can't be null for an 
output stream.");
     }
+
+    if (outStreams.containsKey(runner.getStreamSpec(streamId))) {
+      throw new IllegalStateException("Cannot invoke getOutputStream() 
multiple times with the same streamId: " + streamId);
+    }
+
     return outStreams.computeIfAbsent(runner.getStreamSpec(streamId),
         streamSpec -> new OutputStreamInternalImpl<>(this, streamSpec, 
(Function<M, K>) keyExtractor, (Function<M, V>) msgExtractor));
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/49cf06fc/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java 
b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index 73bb53f..be52565 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -23,6 +23,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.operators.ContextManager;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.impl.OperatorImplGraph;
+import org.apache.samza.operators.impl.RootOperatorImpl;
 import org.apache.samza.operators.stream.InputStreamInternal;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -118,8 +119,10 @@ public final class StreamOperatorTask implements 
StreamTask, InitableTask, Windo
     SystemStream systemStream = 
ime.getSystemStreamPartition().getSystemStream();
     InputStreamInternal inputStream = 
inputSystemStreamToInputStream.get(systemStream);
     // TODO: SAMZA-1148 - Cast to appropriate input (key, msg) types based on 
the serde before applying the msgBuilder.
-    operatorImplGraph.getRootOperator(systemStream)
-        .onNext(inputStream.getMsgBuilder().apply(ime.getKey(), 
ime.getMessage()), collector, coordinator);
+    RootOperatorImpl rootOperatorImpl = 
operatorImplGraph.getRootOperator(systemStream);
+    if (rootOperatorImpl != null) {
+      rootOperatorImpl.onNext(inputStream.getMsgBuilder().apply(ime.getKey(), 
ime.getMessage()), collector, coordinator);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/49cf06fc/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java 
b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
index 3ab1a3c..77a8960 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
@@ -66,6 +66,37 @@ public class TestStreamGraphImpl {
     assertEquals(((TestInputMessageEnvelope) xInputMsg).getInputId(), 
"input-id-1");
   }
 
+  @Test(expected = IllegalStateException.class)
+  public void testMultipleGetInputStream() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    Config mockConfig = mock(Config.class);
+
+    StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", 
"physical-stream-1", "test-system");
+    StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", 
"physical-stream-2", "test-system");
+    StreamSpec nonExistentStreamSpec = new StreamSpec("non-existent-stream", 
"physical-stream-1", "test-system");
+
+    
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1);
+    
when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2);
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+    BiFunction<String, MessageType, TestInputMessageEnvelope> xMsgBuilder =
+        (k, v) -> new TestInputMessageEnvelope(k, v.getValue(), 
v.getEventTime(), "input-id-1");
+
+    //create 2 streams for the corresponding streamIds
+    MessageStream<TestInputMessageEnvelope> inputStream1 = 
graph.getInputStream("test-stream-1", xMsgBuilder);
+    MessageStream<TestInputMessageEnvelope> inputStream2 = 
graph.getInputStream("test-stream-2", xMsgBuilder);
+
+    //assert that the streamGraph contains only the above 2 streams
+    assertEquals(graph.getInputStreams().get(testStreamSpec1), inputStream1);
+    assertEquals(graph.getInputStreams().get(testStreamSpec2), inputStream2);
+    assertEquals(graph.getInputStreams().get(nonExistentStreamSpec), null);
+    assertEquals(graph.getInputStreams().size(), 2);
+
+    //should throw IllegalStateException
+    graph.getInputStream("test-stream-1", xMsgBuilder);
+  }
+
+
   @Test
   public void testGetOutputStream() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);

Reply via email to