SAMZA-1361; OperatorImplGraph is using wrong keys to store/retrieve OperatorImpl in the map
Author: Prateek Maheshwari <pmahe...@linkedin.com> Reviewers: Jagadish <jagad...@apachce.org> Closes #248 from prateekm/operatorimpl-key and squashes the following commits: e733e9d3 [Prateek Maheshwari] Dummy commit to trigger jenkins build. 5a16f162 [Prateek Maheshwari] Updated with Yi's feedback. 8eb2c5df [Prateek Maheshwari] SAMZA-1361: OperatorImplGraph is using wrong keys to store/retrieve OperatorImpl in the map Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c2b55dc3 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c2b55dc3 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c2b55dc3 Branch: refs/heads/0.14.0 Commit: c2b55dc3962674f2117a795aa210f9f04557ebbe Parents: c27a253 Author: Prateek Maheshwari <pmahe...@linkedin.com> Authored: Thu Jul 27 15:13:33 2017 -0700 Committer: Jagadish <jagad...@apache.org> Committed: Thu Jul 27 15:13:33 2017 -0700 ---------------------------------------------------------------------- .../apache/samza/operators/StreamGraphImpl.java | 4 ++-- .../samza/operators/impl/OperatorImpl.java | 4 ++-- .../samza/operators/impl/OperatorImplGraph.java | 5 ++-- .../samza/operators/TestJoinOperator.java | 2 +- .../operators/impl/TestOperatorImplGraph.java | 25 +++++++++++++++++++- 5 files changed, 31 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/c2b55dc3/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java index c0da1b2..2c2eb56 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java @@ -170,13 +170,13 @@ public class StreamGraphImpl implements StreamGraph { /** * Get all {@link OperatorSpec}s available in this {@link StreamGraphImpl} * - * @return a set of all available {@link OperatorSpec}s + * @return all available {@link OperatorSpec}s */ public Collection<OperatorSpec> getAllOperatorSpecs() { Collection<InputOperatorSpec> inputOperatorSpecs = inputOperators.values(); Set<OperatorSpec> operatorSpecs = new HashSet<>(); - for (InputOperatorSpec inputOperatorSpec: inputOperatorSpecs) { + operatorSpecs.add(inputOperatorSpec); doGetOperatorSpecs(inputOperatorSpec, operatorSpecs); } return operatorSpecs; http://git-wip-us.apache.org/repos/asf/samza/blob/c2b55dc3/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java index 73bb83d..8dd5acd 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -186,13 +186,13 @@ public abstract class OperatorImpl<M, RM> { protected abstract OperatorSpec<M, RM> getOperatorSpec(); /** - * Get the name for this {@link OperatorImpl}. + * Get the unique name for this {@link OperatorImpl} in the DAG. * * Some {@link OperatorImpl}s don't have a 1:1 mapping with their {@link OperatorSpec}. E.g., there are * 2 PartialJoinOperatorImpls for a JoinOperatorSpec. Overriding this method allows them to provide an * implementation specific name, e.g., for use in metrics. * - * @return the operator name + * @return the unique name for this {@link OperatorImpl} in the DAG */ protected String getOperatorName() { return getOperatorSpec().getOpName(); http://git-wip-us.apache.org/repos/asf/samza/blob/c2b55dc3/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java index e5fce13..99496eb 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java @@ -129,7 +129,7 @@ public class OperatorImplGraph { */ OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec, Config config, TaskContext context) { - if (!operatorImpls.containsKey(operatorSpec) || operatorSpec instanceof JoinOperatorSpec) { + if (!operatorImpls.containsKey(operatorSpec.getOpName()) || operatorSpec instanceof JoinOperatorSpec) { // Either this is the first time we've seen this operatorSpec, or this is a join operator spec // and we need to create 2 partial join operator impls for it. Initialize and register the sub-DAG. OperatorImpl operatorImpl = createOperatorImpl(prevOperatorSpec, operatorSpec, config, context); @@ -145,7 +145,7 @@ public class OperatorImplGraph { } else { // the implementation corresponding to operatorSpec has already been instantiated // and registered, so we do not need to traverse the DAG further. - return operatorImpls.get(operatorSpec); + return operatorImpls.get(operatorSpec.getOpName()); } } @@ -179,7 +179,6 @@ public class OperatorImplGraph { private PartialJoinOperatorImpl createPartialJoinOperatorImpl(OperatorSpec prevOperatorSpec, JoinOperatorSpec joinOpSpec, Config config, TaskContext context, Clock clock) { Pair<PartialJoinFunction, PartialJoinFunction> partialJoinFunctions = getOrCreatePartialJoinFunctions(joinOpSpec); - if (joinOpSpec.getLeftInputOpSpec().equals(prevOperatorSpec)) { // we got here from the left side of the join return new PartialJoinOperatorImpl(joinOpSpec, /* isLeftSide */ true, partialJoinFunctions.getLeft(), partialJoinFunctions.getRight(), config, context, clock); http://git-wip-us.apache.org/repos/asf/samza/blob/c2b55dc3/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java index 0c41fb8..c51b1ea 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java @@ -71,7 +71,7 @@ public class TestJoinOperator { } @Test - public void testJoinFnInitAndClose() throws Exception { + public void joinFnInitAndClose() throws Exception { TestJoinFunction joinFn = new TestJoinFunction(); StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), new TestJoinStreamApplication(joinFn)); assertEquals(1, joinFn.getNumInitCalls()); http://git-wip-us.apache.org/repos/asf/samza/blob/c2b55dc3/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java index b2c7722..4505eef 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java @@ -41,6 +41,7 @@ import org.junit.Test; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.function.BiFunction; import java.util.function.Function; @@ -57,6 +58,7 @@ import static org.mockito.Mockito.when; public class TestOperatorImplGraph { + @Test public void testEmptyChain() { StreamGraphImpl streamGraph = new StreamGraphImpl(mock(ApplicationRunner.class), mock(Config.class)); OperatorImplGraph opGraph = @@ -101,7 +103,6 @@ public class TestOperatorImplGraph { assertEquals(OpCode.SEND_TO, sendToOpImpl.getOperatorSpec().getOpCode()); } - @Test public void testBroadcastChain() { ApplicationRunner mockRunner = mock(ApplicationRunner.class); @@ -126,6 +127,28 @@ public class TestOperatorImplGraph { } @Test + public void testMergeChain() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system")); + StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class)); + + MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class)); + MessageStream<Object> stream1 = inputStream.filter(mock(FilterFunction.class)); + MessageStream<Object> stream2 = inputStream.map(mock(MapFunction.class)); + MessageStream<Object> mergedStream = stream1.merge(Collections.singleton(stream2)); + MapFunction mockMapFunction = mock(MapFunction.class); + mergedStream.map(mockMapFunction); + + TaskContext mockTaskContext = mock(TaskContext.class); + when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + OperatorImplGraph opImplGraph = + new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class)); + + // verify that the DAG after merge is only traversed & initialized once + verify(mockMapFunction, times(1)).init(any(Config.class), any(TaskContext.class)); + } + + @Test public void testJoinChain() { ApplicationRunner mockRunner = mock(ApplicationRunner.class); when(mockRunner.getStreamSpec(eq("input1"))).thenReturn(new StreamSpec("input1", "input-stream1", "input-system"));