Repository: apex-core Updated Branches: refs/heads/master ca1a375f9 -> f63e01d14
APEXCORE-511 add null and empty checks for addOperator, addStream and addModule Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/2ce4ae51 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/2ce4ae51 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/2ce4ae51 Branch: refs/heads/master Commit: 2ce4ae515ddfadddace260839b634cf122653a29 Parents: 077009e Author: Oliver Winke <oli...@datatorrent.com> Authored: Tue Apr 11 17:27:35 2017 -0700 Committer: Oliver Winke <oli...@datatorrent.com> Committed: Wed Apr 12 10:25:17 2017 -0700 ---------------------------------------------------------------------- api/src/main/java/com/datatorrent/api/DAG.java | 18 +++-- .../stram/plan/logical/LogicalPlan.java | 29 ++++--- .../stram/plan/logical/LogicalPlanTest.java | 82 ++++++++++++-------- 3 files changed, 79 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ce4ae51/api/src/main/java/com/datatorrent/api/DAG.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java index 96420a3..93936d7 100644 --- a/api/src/main/java/com/datatorrent/api/DAG.java +++ b/api/src/main/java/com/datatorrent/api/DAG.java @@ -22,6 +22,8 @@ import java.io.Serializable; import java.util.Collection; import java.util.Map; +import javax.annotation.Nonnull; + import org.apache.hadoop.classification.InterfaceStability; import com.datatorrent.api.Context.DAGContext; @@ -216,7 +218,7 @@ public interface DAG extends DAGContext, Serializable * @param clazz Concrete class with default constructor so that instance of it can be initialized and added to the DAG. * @return Instance of the operator that has been added to the DAG. */ - <T extends Operator> T addOperator(String name, Class<T> clazz); + <T extends Operator> T addOperator(@Nonnull String name, Class<T> clazz); /** * <p>addOperator.</p> @@ -225,20 +227,20 @@ public interface DAG extends DAGContext, Serializable * @param operator Instance of the operator that needs to be added to the DAG * @return Instance of the operator that has been added to the DAG. */ - <T extends Operator> T addOperator(String name, T operator); + <T extends Operator> T addOperator(@Nonnull String name, T operator); @InterfaceStability.Evolving - <T extends Module> T addModule(String name, Class<T> moduleClass); + <T extends Module> T addModule(@Nonnull String name, Class<T> moduleClass); @InterfaceStability.Evolving - <T extends Module> T addModule(String name, T module); + <T extends Module> T addModule(@Nonnull String name, T module); /** * <p>addStream.</p> * @param id Identifier of the stream that will be used to identify stream in DAG * @return */ - StreamMeta addStream(String id); + StreamMeta addStream(@Nonnull String id); /** * Add identified stream for given source and sinks. Multiple sinks can be @@ -256,7 +258,7 @@ public interface DAG extends DAGContext, Serializable * @param sinks * @return StreamMeta */ - <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks); + <T> StreamMeta addStream(@Nonnull String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks); /** * Overload varargs version to avoid generic array type safety warnings in calling code. @@ -269,12 +271,12 @@ public interface DAG extends DAGContext, Serializable * @param sink1 * @return StreamMeta */ - <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1); + <T> StreamMeta addStream(@Nonnull String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1); /** * <p>addStream.</p> */ - <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1, Operator.InputPort<? super T> sink2); + <T> StreamMeta addStream(@Nonnull String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1, Operator.InputPort<? super T> sink2); /** * <p>setAttribute.</p> http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ce4ae51/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 62c4fd8..bf4b2cb 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -51,6 +51,7 @@ import java.util.Stack; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; +import javax.annotation.Nonnull; import javax.validation.ConstraintViolation; import javax.validation.ConstraintViolationException; import javax.validation.Validation; @@ -105,6 +106,9 @@ import com.datatorrent.stram.engine.Slider; import static com.datatorrent.api.Context.PortContext.STREAM_CODEC; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Strings.isNullOrEmpty; + /** * DAG contains the logical declarations of operators and streams. * <p> @@ -1233,7 +1237,7 @@ public class LogicalPlan implements Serializable, DAG } @Override - public <T extends Operator> T addOperator(String name, Class<T> clazz) + public <T extends Operator> T addOperator(@Nonnull String name, Class<T> clazz) { T instance; try { @@ -1246,18 +1250,20 @@ public class LogicalPlan implements Serializable, DAG } @Override - public <T extends Operator> T addOperator(String name, T operator) + public <T extends Operator> T addOperator(@Nonnull String name, T operator) { + checkArgument(!isNullOrEmpty(name), "operator name is null or empty"); + if (operators.containsKey(name)) { if (operators.get(name).operator == operator) { return operator; } - throw new IllegalArgumentException("duplicate operator id: " + operators.get(name)); + throw new IllegalArgumentException("duplicate operator name: " + operators.get(name)); } // Avoid name conflict with module. if (modules.containsKey(name)) { - throw new IllegalArgumentException("duplicate operator id: " + operators.get(name)); + throw new IllegalArgumentException("duplicate operator name: " + operators.get(name)); } OperatorMeta decl = new OperatorMeta(name, operator); rootOperators.add(decl); // will be removed when a sink is added to an input port for this operator @@ -1347,16 +1353,18 @@ public class LogicalPlan implements Serializable, DAG } @Override - public <T extends Module> T addModule(String name, T module) + public <T extends Module> T addModule(@Nonnull String name, T module) { + checkArgument(!isNullOrEmpty(name), "module name is null or empty"); + if (modules.containsKey(name)) { if (modules.get(name).module == module) { return module; } - throw new IllegalArgumentException("duplicate module is: " + modules.get(name)); + throw new IllegalArgumentException("duplicate module name: " + modules.get(name)); } if (operators.containsKey(name)) { - throw new IllegalArgumentException("duplicate module is: " + modules.get(name)); + throw new IllegalArgumentException("duplicate module name: " + modules.get(name)); } ModuleMeta meta = new ModuleMeta(name, module); @@ -1365,7 +1373,7 @@ public class LogicalPlan implements Serializable, DAG } @Override - public <T extends Module> T addModule(String name, Class<T> clazz) + public <T extends Module> T addModule(@Nonnull String name, Class<T> clazz) { T instance; try { @@ -1399,8 +1407,9 @@ public class LogicalPlan implements Serializable, DAG } @Override - public StreamMeta addStream(String id) + public StreamMeta addStream(@Nonnull String id) { + checkArgument(!isNullOrEmpty(id),"stream id is null or empty"); StreamMeta s = new StreamMeta(id); StreamMeta o = streams.put(id, s); if (o == null) { @@ -1412,7 +1421,7 @@ public class LogicalPlan implements Serializable, DAG @Override @SuppressWarnings("unchecked") - public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks) + public <T> StreamMeta addStream(@Nonnull String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks) { StreamMeta s = addStream(id); s.setSource(source); http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ce4ae51/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java index 1507e2d..9f68a4f 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java @@ -42,6 +42,7 @@ import javax.validation.constraints.NotNull; import javax.validation.constraints.Pattern; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import com.esotericsoftware.kryo.DefaultSerializer; @@ -60,6 +61,7 @@ import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Module; import com.datatorrent.api.Operator; import com.datatorrent.api.Partitioner; import com.datatorrent.api.Sink; @@ -85,15 +87,21 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; public class LogicalPlanTest { + private LogicalPlan dag; + + @Before + public void setUp() + { + dag = new LogicalPlan(); + } @Test public void testCycleDetection() { - LogicalPlan dag = new LogicalPlan(); - //NodeConf operator1 = b.getOrAddNode("operator1"); GenericTestOperator operator2 = dag.addOperator("operator2", GenericTestOperator.class); GenericTestOperator operator3 = dag.addOperator("operator3", GenericTestOperator.class); @@ -145,8 +153,6 @@ public class LogicalPlanTest @Test public void testCycleDetectionWithDelay() { - LogicalPlan dag = new LogicalPlan(); - TestGeneratorInputOperator opA = dag.addOperator("A", TestGeneratorInputOperator.class); GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class); GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class); @@ -192,7 +198,6 @@ public class LogicalPlanTest @Test public void testLogicalPlanSerialization() throws Exception { - LogicalPlan dag = new LogicalPlan(); dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent()); ValidationOperator validationNode = dag.addOperator("validationNode", ValidationOperator.class); @@ -231,7 +236,6 @@ public class LogicalPlanTest @Test public void testDeleteOperator() { - LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator input = dag.addOperator("input1", TestGeneratorInputOperator.class); GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); @@ -359,7 +363,6 @@ public class LogicalPlanTest Assert.assertEquals("", "intField1", cv.getPropertyPath().toString()); // ensure DAG validation produces matching results - LogicalPlan dag = new LogicalPlan(); bean = dag.addOperator("testOperator", bean); try { @@ -435,7 +438,6 @@ public class LogicalPlanTest @Test public void testValidationForNonInputRootOperator() { - LogicalPlan dag = new LogicalPlan(); NoInputPortOperator x = dag.addOperator("x", new NoInputPortOperator()); try { dag.validate(); @@ -463,7 +465,6 @@ public class LogicalPlanTest @Test public void testOperatorAnnotation() { - LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator input = dag.addOperator("input1", TestGeneratorInputOperator.class); TestOperatorAnnotationOperator operator = dag.addOperator("operator1", TestOperatorAnnotationOperator.class); dag.addStream("Connection", input.outport, operator.input1); @@ -502,11 +503,49 @@ public class LogicalPlanTest } } + @Test(expected = IllegalArgumentException.class) + public void testNullOperatorName() + { + dag.addOperator(null, BaseOperator.class); + } + + @Test(expected = IllegalArgumentException.class) + public void testEmptyOperatorName() + { + dag.addOperator("", BaseOperator.class); + } + + @Test(expected = IllegalArgumentException.class) + public void testNullStreamId() + { + GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); + dag.addStream(null, o1.outport1, o1.inport1, o1.inport2 ); + } + + @Test(expected = IllegalArgumentException.class) + public void testEmptyStreamId() + { + GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); + dag.addStream("", o1.outport1, o1.inport1 ); + } + + @Test(expected = IllegalArgumentException.class) + public void testEmptyModuleName() + { + Module testModule = mock(Module.class); + dag.addModule("", testModule); + } + + @Test(expected = IllegalArgumentException.class) + public void testNullModuleName() + { + Module testModule = mock(Module.class); + dag.addModule(null, testModule); + } + @Test public void testPortConnectionValidation() { - LogicalPlan dag = new LogicalPlan(); - TestNonOptionalOutportInputOperator input = dag.addOperator("input1", TestNonOptionalOutportInputOperator.class); try { @@ -534,8 +573,6 @@ public class LogicalPlanTest @Test public void testAtMostOnceProcessingModeValidation() { - LogicalPlan dag = new LogicalPlan(); - TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class); TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class); @@ -560,14 +597,11 @@ public class LogicalPlanTest OperatorMeta outputOperOm = dag.getMeta(outputOper); Assert.assertEquals("" + outputOperOm.getAttributes(), Operator.ProcessingMode.AT_MOST_ONCE, outputOperOm.getValue(OperatorContext.PROCESSING_MODE)); - } @Test public void testExactlyOnceProcessingModeValidation() { - LogicalPlan dag = new LogicalPlan(); - TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class); TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class); @@ -604,8 +638,6 @@ public class LogicalPlanTest @Test public void testLocalityValidation() { - LogicalPlan dag = new LogicalPlan(); - TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class); GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); StreamMeta s1 = dag.addStream("input1.outport", input1.outport, o1.inport1).setLocality(Locality.THREAD_LOCAL); @@ -668,7 +700,6 @@ public class LogicalPlanTest @Test public void testOutputPortAnnotation() { - LogicalPlan dag = new LogicalPlan(); TestAnnotationsOperator ta1 = dag.addOperator("testAnnotationsOperator", new TestAnnotationsOperator()); try { @@ -757,7 +788,6 @@ public class LogicalPlanTest @Test public void testJdkSerializableOperator() throws Exception { - LogicalPlan dag = new LogicalPlan(); dag.addOperator("o1", new JdkSerializableOperator()); ByteArrayOutputStream outStream = new ByteArrayOutputStream(); @@ -772,7 +802,6 @@ public class LogicalPlanTest @Test public void testAttributeValuesSerializableCheck() throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException { - LogicalPlan dag = new LogicalPlan(); Attribute<Object> attr = new Attribute<>(new TestAttributeValue(), new Object2String()); Field nameField = Attribute.class.getDeclaredField("name"); nameField.setAccessible(true); @@ -892,7 +921,6 @@ public class LogicalPlanTest /* @Test public void testStreamCodec() throws Exception { - LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator input = dag.addOperator("input", TestGeneratorInputOperator.class); GenericTestOperator gto1 = dag.addOperator("gto1", GenericTestOperator.class); StreamMeta stream1 = dag.addStream("s1", input.outport, gto1.inport1); @@ -946,7 +974,6 @@ public class LogicalPlanTest @Test public void testCheckpointableWithinAppWindowAnnotation() { - LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class); GenericTestOperator x = dag.addOperator("x", new GenericTestOperator()); dag.addStream("Stream1", input1.outport, x.inport1); @@ -998,7 +1025,6 @@ public class LogicalPlanTest @Test public void testInputPortHiding() { - LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class); Operator2 operator2 = dag.addOperator("operator2", new Operator2()); dag.addStream("Stream1", input1.outport, operator2.input); @@ -1008,7 +1034,6 @@ public class LogicalPlanTest @Test public void testInvalidInputPortConnection() { - LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class); Operator1 operator1 = dag.addOperator("operator3", new Operator3()); dag.addStream("Stream1", input1.outport, operator1.input); @@ -1024,7 +1049,6 @@ public class LogicalPlanTest @Test public void testAffinityRulesDagValidation() { - LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator o1 = dag.addOperator("O1", new TestGeneratorInputOperator()); GenericTestOperator o2 = dag.addOperator("O2", new GenericTestOperator()); GenericTestOperator o3 = dag.addOperator("O3", new GenericTestOperator()); @@ -1183,7 +1207,6 @@ public class LogicalPlanTest @Test public void testOutputPortHiding() { - LogicalPlan dag = new LogicalPlan(); Operator5 operator5 = dag.addOperator("input", new Operator5()); Operator2 operator2 = dag.addOperator("operator2", new Operator2()); dag.addStream("Stream1", operator5.output, operator2.input); @@ -1193,7 +1216,6 @@ public class LogicalPlanTest @Test(expected = ValidationException.class) public void testInvalidOutputPortConnection() { - LogicalPlan dag = new LogicalPlan(); Operator4 operator4 = dag.addOperator("input", new Operator5()); Operator3 operator3 = dag.addOperator("operator3", new Operator3()); dag.addStream("Stream1", operator4.output, operator3.input); @@ -1245,8 +1267,6 @@ public class LogicalPlanTest @Test public void testInvalidInputOperatorDeclaration() { - LogicalPlan dag = new LogicalPlan(); - TestGeneratorInputOperator.InvalidInputOperator inputOperator = dag.addOperator("input", new TestGeneratorInputOperator.InvalidInputOperator()); GenericTestOperator operator2 = dag.addOperator("operator2", GenericTestOperator.class); @@ -1264,8 +1284,6 @@ public class LogicalPlanTest @Test public void testValidInputOperatorDeclaration() { - LogicalPlan dag = new LogicalPlan(); - TestGeneratorInputOperator.ValidGenericOperator operator1 = dag.addOperator("input", new TestGeneratorInputOperator.ValidGenericOperator()); GenericTestOperator operator2 = dag.addOperator("operator2", GenericTestOperator.class);