Repository: samza Updated Branches: refs/heads/master 0feb5c2da -> 49cf06fc5
SAMZA-1202; Multiple calls to getInputStream() result in non-deterministic behavior Author: vjagadish1989 <jvenk...@linkedin.com> Reviewers: Prateek Maheshwari<prate...@linkedin.com> Closes #136 from vjagadish1989/samza-1202 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/49cf06fc Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/49cf06fc Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/49cf06fc Branch: refs/heads/master Commit: 49cf06fc5a7db4ad1718bfc2d110db6f48d4f180 Parents: 0feb5c2 Author: vjagadish1989 <jvenk...@linkedin.com> Authored: Mon Apr 24 17:15:47 2017 -0700 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Mon Apr 24 17:15:47 2017 -0700 ---------------------------------------------------------------------- .../org/apache/samza/operators/StreamGraph.java | 8 +++-- .../org/apache/samza/system/StreamSpec.java | 15 ++++++++++ .../apache/samza/operators/StreamGraphImpl.java | 11 +++++++ .../apache/samza/task/StreamOperatorTask.java | 7 +++-- .../samza/operators/TestStreamGraphImpl.java | 31 ++++++++++++++++++++ 5 files changed, 68 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/49cf06fc/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java index a03f7c3..d299068 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java +++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java @@ -30,7 +30,8 @@ import java.util.function.Function; public interface StreamGraph { /** - * Gets the input {@link MessageStream} corresponding to the logical {@code streamId}. + * Gets the input {@link MessageStream} corresponding to the logical {@code streamId}. Multiple invocations of + * this method with the same {@code streamId} will throw an {@link IllegalStateException} * * @param streamId the unique logical ID for the stream * @param msgBuilder the {@link BiFunction} to convert the incoming key and message to a message @@ -39,11 +40,13 @@ public interface StreamGraph { * @param <V> the type of message in the incoming message * @param <M> the type of message in the input {@link MessageStream} * @return the input {@link MessageStream} + * @throws IllegalStateException when invoked multiple times with the same {@code streamId} */ <K, V, M> MessageStream<M> getInputStream(String streamId, BiFunction<? super K, ? super V, ? extends M> msgBuilder); /** - * Gets the {@link OutputStream} corresponding to the logical {@code streamId}. + * Gets the {@link OutputStream} corresponding to the logical {@code streamId}. Multiple invocations of + * this method with the same {@code streamId} will throw an {@link IllegalStateException} * * @param streamId the unique logical ID for the stream * @param keyExtractor the {@link Function} to extract the outgoing key from the output message @@ -52,6 +55,7 @@ public interface StreamGraph { * @param <V> the type of message in the outgoing message * @param <M> the type of message in the {@link OutputStream} * @return the output {@link MessageStream} + * @throws IllegalStateException when invoked multiple times with the same {@code streamId} */ <K, V, M> OutputStream<K, V, M> getOutputStream(String streamId, Function<? super M, ? extends K> keyExtractor, Function<? super M, ? extends V> msgExtractor); http://git-wip-us.apache.org/repos/asf/samza/blob/49cf06fc/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java index 5711a8b..237bedd 100644 --- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java +++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java @@ -201,4 +201,19 @@ public class StreamSpec { throw new IllegalArgumentException(String.format("Identifier '%s' is '%s'. It must match the expression [A-Za-z0-9_-]+", identifierName, identifierValue)); } } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || !getClass().equals(o.getClass())) return false; + + StreamSpec that = (StreamSpec) o; + + return id.equals(that.id); + } + + @Override + public int hashCode() { + return id.hashCode(); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/49cf06fc/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java index 86ce6a4..5ba390a 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java @@ -65,6 +65,11 @@ public class StreamGraphImpl implements StreamGraph { if (msgBuilder == null) { throw new IllegalArgumentException("msgBuilder can't be null for an input stream"); } + + if (inStreams.containsKey(runner.getStreamSpec(streamId))) { + throw new IllegalStateException("Cannot invoke getInputStream() multiple times with the same streamId: " + streamId); + } + return inStreams.computeIfAbsent(runner.getStreamSpec(streamId), streamSpec -> new InputStreamInternalImpl<>(this, streamSpec, (BiFunction<K, V, M>) msgBuilder)); } @@ -75,9 +80,15 @@ public class StreamGraphImpl implements StreamGraph { if (keyExtractor == null) { throw new IllegalArgumentException("keyExtractor can't be null for an output stream."); } + if (msgExtractor == null) { throw new IllegalArgumentException("msgExtractor can't be null for an output stream."); } + + if (outStreams.containsKey(runner.getStreamSpec(streamId))) { + throw new IllegalStateException("Cannot invoke getOutputStream() multiple times with the same streamId: " + streamId); + } + return outStreams.computeIfAbsent(runner.getStreamSpec(streamId), streamSpec -> new OutputStreamInternalImpl<>(this, streamSpec, (Function<M, K>) keyExtractor, (Function<M, V>) msgExtractor)); } http://git-wip-us.apache.org/repos/asf/samza/blob/49cf06fc/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java index 73bb53f..be52565 100644 --- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java +++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java @@ -23,6 +23,7 @@ import org.apache.samza.config.Config; import org.apache.samza.operators.ContextManager; import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.impl.OperatorImplGraph; +import org.apache.samza.operators.impl.RootOperatorImpl; import org.apache.samza.operators.stream.InputStreamInternal; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.IncomingMessageEnvelope; @@ -118,8 +119,10 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream(); InputStreamInternal inputStream = inputSystemStreamToInputStream.get(systemStream); // TODO: SAMZA-1148 - Cast to appropriate input (key, msg) types based on the serde before applying the msgBuilder. - operatorImplGraph.getRootOperator(systemStream) - .onNext(inputStream.getMsgBuilder().apply(ime.getKey(), ime.getMessage()), collector, coordinator); + RootOperatorImpl rootOperatorImpl = operatorImplGraph.getRootOperator(systemStream); + if (rootOperatorImpl != null) { + rootOperatorImpl.onNext(inputStream.getMsgBuilder().apply(ime.getKey(), ime.getMessage()), collector, coordinator); + } } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/49cf06fc/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java index 3ab1a3c..77a8960 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java @@ -66,6 +66,37 @@ public class TestStreamGraphImpl { assertEquals(((TestInputMessageEnvelope) xInputMsg).getInputId(), "input-id-1"); } + @Test(expected = IllegalStateException.class) + public void testMultipleGetInputStream() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + Config mockConfig = mock(Config.class); + + StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system"); + StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system"); + StreamSpec nonExistentStreamSpec = new StreamSpec("non-existent-stream", "physical-stream-1", "test-system"); + + when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1); + when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2); + + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); + BiFunction<String, MessageType, TestInputMessageEnvelope> xMsgBuilder = + (k, v) -> new TestInputMessageEnvelope(k, v.getValue(), v.getEventTime(), "input-id-1"); + + //create 2 streams for the corresponding streamIds + MessageStream<TestInputMessageEnvelope> inputStream1 = graph.getInputStream("test-stream-1", xMsgBuilder); + MessageStream<TestInputMessageEnvelope> inputStream2 = graph.getInputStream("test-stream-2", xMsgBuilder); + + //assert that the streamGraph contains only the above 2 streams + assertEquals(graph.getInputStreams().get(testStreamSpec1), inputStream1); + assertEquals(graph.getInputStreams().get(testStreamSpec2), inputStream2); + assertEquals(graph.getInputStreams().get(nonExistentStreamSpec), null); + assertEquals(graph.getInputStreams().size(), 2); + + //should throw IllegalStateException + graph.getInputStream("test-stream-1", xMsgBuilder); + } + + @Test public void testGetOutputStream() { ApplicationRunner mockRunner = mock(ApplicationRunner.class);