Repository: apex-core Updated Branches: refs/heads/master 74f732a79 -> abc836ca1
APEXCORE-604 extend DAG API to get operators and streams from the DAG. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/abc836ca Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/abc836ca Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/abc836ca Branch: refs/heads/master Commit: abc836ca18d63a895d431dc5acf4d3a1018b235d Parents: 74f732a Author: Tushar R. Gosavi <tus...@apache.org> Authored: Tue Jan 17 12:11:12 2017 +0530 Committer: Tushar R. Gosavi <tus...@apache.org> Committed: Thu Feb 16 14:06:38 2017 +0530 ---------------------------------------------------------------------- api/src/main/java/com/datatorrent/api/DAG.java | 71 ++++++++++++++++++++ .../stram/StreamingContainerAgent.java | 2 +- .../stram/StreamingContainerManager.java | 4 +- .../stram/codec/LogicalPlanSerializer.java | 9 ++- .../stram/plan/logical/LogicalPlan.java | 62 +++++++++++------ .../stram/plan/physical/PTOperator.java | 2 +- .../stram/plan/physical/PhysicalPlan.java | 20 +++--- .../stram/plan/physical/PlanModifier.java | 2 +- .../stram/plan/physical/StreamMapping.java | 4 +- .../com/datatorrent/stram/StreamCodecTest.java | 2 +- .../logical/LogicalPlanConfigurationTest.java | 10 +-- .../logical/module/TestModuleExpansion.java | 2 +- 12 files changed, 142 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/api/src/main/java/com/datatorrent/api/DAG.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java index b80bc93..532ff72 100644 --- a/api/src/main/java/com/datatorrent/api/DAG.java +++ b/api/src/main/java/com/datatorrent/api/DAG.java @@ -19,6 +19,8 @@ package com.datatorrent.api; import java.io.Serializable; +import java.util.Collection; +import java.util.Map; import org.apache.hadoop.classification.InterfaceStability; @@ -41,11 +43,26 @@ public interface DAG extends DAGContext, Serializable { interface InputPortMeta extends Serializable, PortContext { + /** + * Return port object represented by this InputPortMeta + * @return + */ + public Operator.InputPort<?> getPort(); + + public <T extends OperatorMeta> T getOperatorMeta(); } interface OutputPortMeta extends Serializable, PortContext { OperatorMeta getUnifierMeta(); + + /** + * Return port object represented by this OutputPortMeta + * @return + */ + public Operator.OutputPort<?> getPort(); + + public <T extends OperatorMeta> T getOperatorMeta(); } /** @@ -143,6 +160,19 @@ public interface DAG extends DAGContext, Serializable */ public StreamMeta persistUsing(String name, Operator persistOperator, Operator.InputPort<?> persistOperatorInputPort, Operator.InputPort<?> sinkToPersist); + /** + * Return source of the stream. + * @param <T> + * @return + */ + public <T extends OutputPortMeta> T getSource(); + + /** + * Return all sinks connected to this stream. + * @param <T> + * @return + */ + public <T extends InputPortMeta> Collection<T> getSinks(); } /** @@ -157,6 +187,22 @@ public interface DAG extends DAGContext, Serializable public InputPortMeta getMeta(Operator.InputPort<?> port); public OutputPortMeta getMeta(Operator.OutputPort<?> port); + + /** + * Return collection of stream which are connected to this operator's + * input ports. + * @param <T> + * @return + */ + public <K extends InputPortMeta, V extends StreamMeta> Map<K, V> getInputStreams(); + + /** + * Return collection of stream which are connected to this operator's + * output ports. + * @param <T> + * @return + */ + public <K extends OutputPortMeta, V extends StreamMeta> Map<K, V> getOutputStreams(); } /** @@ -281,6 +327,31 @@ public interface DAG extends DAGContext, Serializable public abstract OperatorMeta getMeta(Operator operator); /** + * Return all operators present in the DAG. + * @param <T> + * @return + */ + public <T extends OperatorMeta> Collection<T> getAllOperatorsMeta(); + + /** + * Get all input operators in the DAG. This method returns operators which are + * not connected to any upstream operator. i.e the operators which do not have + * any input ports or operators which is not connected through any input ports + * in the DAG. + * + * @param <T> + * @return list of {@see OperatorMeta} for root operators in the DAG. + */ + public <T extends OperatorMeta> Collection<T> getRootOperatorsMeta(); + + /** + * Returns all Streams present in the DAG. + * @param <T> + * @return + */ + public <T extends StreamMeta> Collection<T> getAllStreamsMeta(); + + /** * Marker interface for the Node in the DAG. Any object which can be added as a Node in the DAG * needs to implement this interface. */ http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java index 1d0897d..eb7aa43 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java @@ -350,7 +350,7 @@ public class StreamingContainerAgent StreamCodec<?> codec = inputPortMeta.getValue(PortContext.STREAM_CODEC); if (codec == null) { // it cannot be this object that gets returned. Depending on this value is dangerous - codec = inputPortMeta.getPortObject().getStreamCodec(); + codec = inputPortMeta.getPort().getStreamCodec(); if (codec != null) { // don't create codec multiple times - it will assign a new identifier inputPortMeta.getAttributes().put(PortContext.STREAM_CODEC, codec); http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 00a406c..dfbc7d1 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -706,7 +706,7 @@ public class StreamingContainerManager implements PlanContext appDataSource.setQueryOperatorName(queryOperatorName); appDataSource.setQueryTopic(queryTopic); appDataSource.setQueryUrl(convertAppDataUrl(queryUrl)); - List<LogicalPlan.InputPortMeta> sinks = entry.getValue().getSinks(); + Collection<LogicalPlan.InputPortMeta> sinks = entry.getValue().getSinks(); if (sinks.isEmpty()) { LOG.warn("There is no result operator for the App Data Source {}.{}. Ignoring the App Data Source.", operatorMeta.getName(), portMeta.getPortName()); continue; @@ -715,7 +715,7 @@ public class StreamingContainerManager implements PlanContext LOG.warn("There are multiple result operators for the App Data Source {}.{}. Ignoring the App Data Source.", operatorMeta.getName(), portMeta.getPortName()); continue; } - OperatorMeta resultOperatorMeta = sinks.get(0).getOperatorWrapper(); + OperatorMeta resultOperatorMeta = sinks.iterator().next().getOperatorMeta(); if (resultOperatorMeta.getOperator() instanceof AppData.ConnectionInfoProvider) { AppData.ConnectionInfoProvider resultOperator = (AppData.ConnectionInfoProvider)resultOperatorMeta.getOperator(); appDataSource.setResultOperatorName(resultOperatorMeta.getName()); http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java index 6607321..b1d1fd8 100644 --- a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java +++ b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; import javax.ws.rs.Produces; @@ -203,11 +202,11 @@ public class LogicalPlanSerializer extends JsonSerializer<LogicalPlan> sourcePortDetailMap.put("portName", sourcePortName); streamDetailMap.put("name", streamName); streamDetailMap.put("source", sourcePortDetailMap); - List<InputPortMeta> sinks = streamMeta.getSinks(); + Collection<InputPortMeta> sinks = streamMeta.getSinks(); ArrayList<HashMap<String, Object>> sinkPortList = new ArrayList<>(); for (InputPortMeta sinkPort : sinks) { HashMap<String, Object> sinkPortDetailMap = new HashMap<>(); - sinkPortDetailMap.put("operatorName", sinkPort.getOperatorWrapper().getName()); + sinkPortDetailMap.put("operatorName", sinkPort.getOperatorMeta().getName()); sinkPortDetailMap.put("portName", sinkPort.getPortName()); sinkPortList.add(sinkPortDetailMap); } @@ -257,14 +256,14 @@ public class LogicalPlanSerializer extends JsonSerializer<LogicalPlan> for (StreamMeta streamMeta : allStreams) { String streamKey = LogicalPlanConfiguration.STREAM_PREFIX + streamMeta.getName(); OutputPortMeta source = streamMeta.getSource(); - List<InputPortMeta> sinks = streamMeta.getSinks(); + Collection<InputPortMeta> sinks = streamMeta.getSinks(); props.setProperty(streamKey + "." + LogicalPlanConfiguration.STREAM_SOURCE, source.getOperatorMeta().getName() + "." + source.getPortName()); String sinksValue = ""; for (InputPortMeta sink : sinks) { if (!sinksValue.isEmpty()) { sinksValue += ","; } - sinksValue += sink.getOperatorWrapper().getName() + "." + sink.getPortName(); + sinksValue += sink.getOperatorMeta().getName() + "." + sink.getPortName(); } props.setProperty(streamKey + "." + LogicalPlanConfiguration.STREAM_SINKS, sinksValue); if (streamMeta.getLocality() != null) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 1371ce8..e1debbb 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -242,7 +242,8 @@ public class LogicalPlan implements Serializable, DAG //This is null when port is not hidden private Class<?> classDeclaringHiddenPort; - public OperatorMeta getOperatorWrapper() + @Override + public OperatorMeta getOperatorMeta() { return operatorMeta; } @@ -252,7 +253,8 @@ public class LogicalPlan implements Serializable, DAG return fieldName; } - public InputPort<?> getPortObject() + @Override + public InputPort<?> getPort() { for (Map.Entry<InputPort<?>, InputPortMeta> e : operatorMeta.getPortMapping().inPortMap.entrySet()) { if (e.getValue() == this) { @@ -328,6 +330,7 @@ public class LogicalPlan implements Serializable, DAG this.attributes = new DefaultAttributeMap(); } + @Override public OperatorMeta getOperatorMeta() { return operatorMeta; @@ -363,7 +366,8 @@ public class LogicalPlan implements Serializable, DAG return fieldName; } - public OutputPort<?> getPortObject() + @Override + public OutputPort<?> getPort() { for (Map.Entry<OutputPort<?>, OutputPortMeta> e : operatorMeta.getPortMapping().outPortMap.entrySet()) { if (e.getValue() == this) { @@ -504,7 +508,8 @@ public class LogicalPlan implements Serializable, DAG return this; } - public List<InputPortMeta> getSinks() + @Override + public Collection<InputPortMeta> getSinks() { return sinks; } @@ -517,7 +522,7 @@ public class LogicalPlan implements Serializable, DAG return this; } InputPortMeta portMeta = assertGetPortMeta(port); - OperatorMeta om = portMeta.getOperatorWrapper(); + OperatorMeta om = portMeta.getOperatorMeta(); String portName = portMeta.getPortName(); if (om.inputStreams.containsKey(portMeta)) { throw new IllegalArgumentException(String.format("Port %s already connected to stream %s", portName, om.inputStreams.get(portMeta))); @@ -539,9 +544,9 @@ public class LogicalPlan implements Serializable, DAG public void remove() { for (InputPortMeta ipm : this.sinks) { - ipm.getOperatorWrapper().inputStreams.remove(ipm); - if (ipm.getOperatorWrapper().inputStreams.isEmpty()) { - rootOperators.add(ipm.getOperatorWrapper()); + ipm.getOperatorMeta().inputStreams.remove(ipm); + if (ipm.getOperatorMeta().inputStreams.isEmpty()) { + rootOperators.add(ipm.getOperatorMeta()); } } // Remove persist operator for at stream level if present: @@ -697,7 +702,7 @@ public class LogicalPlan implements Serializable, DAG private void setPersistOperatorInputPort(InputPortMeta inport) { - this.addSink(inport.getPortObject()); + this.addSink(inport.getPort()); this.persistOperatorInputPort = inport; } @@ -714,7 +719,7 @@ public class LogicalPlan implements Serializable, DAG private String getPersistOperatorName(InputPort<?> sinkToPersist) { InputPortMeta portMeta = assertGetPortMeta(sinkToPersist); - OperatorMeta operatorMeta = portMeta.getOperatorWrapper(); + OperatorMeta operatorMeta = portMeta.getOperatorMeta(); return id + "_" + operatorMeta.getName() + "_persister"; } @@ -735,7 +740,7 @@ public class LogicalPlan implements Serializable, DAG { StreamCodec<Object> inputStreamCodec = sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC) != null ? (StreamCodec<Object>)sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC) - : (StreamCodec<Object>)sinkToPersistPortMeta.getPortObject().getStreamCodec(); + : (StreamCodec<Object>)sinkToPersistPortMeta.getPort().getStreamCodec(); if (inputStreamCodec != null) { Map<InputPortMeta, StreamCodec<Object>> codecs = new HashMap<>(); codecs.put(sinkToPersistPortMeta, inputStreamCodec); @@ -1138,6 +1143,7 @@ public class LogicalPlan implements Serializable, DAG return getPortMapping().inPortMap.get(port); } + @Override public Map<InputPortMeta, StreamMeta> getInputStreams() { return this.inputStreams; @@ -1359,7 +1365,7 @@ public class LogicalPlan implements Serializable, DAG Map<InputPortMeta, StreamMeta> inputStreams = om.getInputStreams(); for (Map.Entry<InputPortMeta, StreamMeta> e : inputStreams.entrySet()) { StreamMeta stream = e.getValue(); - if (e.getKey().getOperatorWrapper() == om) { + if (e.getKey().getOperatorMeta() == om) { stream.sinks.remove(e.getKey()); } // If persistStream was enabled for stream, reset stream when sink removed @@ -1424,12 +1430,12 @@ public class LogicalPlan implements Serializable, DAG OutputPortMeta sourceMeta = streamMeta.getSource(); List<InputPort<?>> ports = new LinkedList<>(); for (InputPortMeta inputPortMeta : streamMeta.getSinks()) { - ports.add(inputPortMeta.getPortObject()); + ports.add(inputPortMeta.getPort()); } InputPort[] inputPorts = ports.toArray(new InputPort[]{}); name = subDAGName + MODULE_NAMESPACE_SEPARATOR + streamMeta.getName(); - StreamMeta streamMetaNew = this.addStream(name, sourceMeta.getPortObject(), inputPorts); + StreamMeta streamMetaNew = this.addStream(name, sourceMeta.getPort(), inputPorts); streamMetaNew.setLocality(streamMeta.getLocality()); } } @@ -1531,6 +1537,12 @@ public class LogicalPlan implements Serializable, DAG return Collections.unmodifiableList(this.rootOperators); } + @Override + public List<OperatorMeta> getRootOperatorsMeta() + { + return getRootOperators(); + } + public List<OperatorMeta> getLeafOperators() { return Collections.unmodifiableList(this.leafOperators); @@ -1541,6 +1553,12 @@ public class LogicalPlan implements Serializable, DAG return Collections.unmodifiableCollection(this.operators.values()); } + @Override + public Collection<OperatorMeta> getAllOperatorsMeta() + { + return getAllOperators(); + } + public Collection<ModuleMeta> getAllModules() { return Collections.unmodifiableCollection(this.modules.values()); @@ -1552,6 +1570,12 @@ public class LogicalPlan implements Serializable, DAG } @Override + public Collection<StreamMeta> getAllStreamsMeta() + { + return getAllStreams(); + } + + @Override public OperatorMeta getOperatorMeta(String operatorName) { return this.operators.get(operatorName); @@ -1618,7 +1642,7 @@ public class LogicalPlan implements Serializable, DAG if (streamCodec != null) { classNames.add(streamCodec.getClass().getName()); } else { - StreamCodec<?> codec = sink.getPortObject().getStreamCodec(); + StreamCodec<?> codec = sink.getPort().getStreamCodec(); if (codec != null) { classNames.add(codec.getClass().getName()); } @@ -1927,7 +1951,7 @@ public class LogicalPlan implements Serializable, DAG for (StreamMeta stream : getAllStreams()) { String source = stream.source.getOperatorMeta().getName(); for (InputPortMeta sink : stream.sinks) { - String sinkOperator = sink.getOperatorWrapper().getName(); + String sinkOperator = sink.getOperatorMeta().getName(); OperatorPair pair = new OperatorPair(source, sinkOperator); if (stream.getLocality() != null && stream.getLocality().ordinal() <= Locality.NODE_LOCAL.ordinal() && hostNamesMapping.containsKey(pair.first) && hostNamesMapping.containsKey(pair.second) && !hostNamesMapping.get(pair.first).equals(hostNamesMapping.get(pair.second))) { throw new ValidationException(String.format("Host Locality for operators: %s(host: %s) & %s(host: %s) conflicts with stream locality", pair.first, hostNamesMapping.get(pair.first), pair.second, hostNamesMapping.get(pair.second))); @@ -2188,7 +2212,7 @@ public class LogicalPlan implements Serializable, DAG // depth first successors traversal for (StreamMeta downStream: om.outputStreams.values()) { for (InputPortMeta sink: downStream.sinks) { - OperatorMeta successor = sink.getOperatorWrapper(); + OperatorMeta successor = sink.getOperatorMeta(); if (successor == null) { continue; } @@ -2256,7 +2280,7 @@ public class LogicalPlan implements Serializable, DAG for (StreamMeta downStream: om.outputStreams.values()) { for (InputPortMeta sink : downStream.sinks) { - OperatorMeta successor = sink.getOperatorWrapper(); + OperatorMeta successor = sink.getOperatorMeta(); if (isDelayOperator) { sink.attributes.put(IS_CONNECTED_TO_DELAY_OPERATOR, true); // Check whether all downstream operators are already visited in the path @@ -2285,7 +2309,7 @@ public class LogicalPlan implements Serializable, DAG Operator.ProcessingMode pm = om.getValue(OperatorContext.PROCESSING_MODE); for (StreamMeta os : om.outputStreams.values()) { for (InputPortMeta sink: os.sinks) { - OperatorMeta sinkOm = sink.getOperatorWrapper(); + OperatorMeta sinkOm = sink.getOperatorMeta(); Operator.ProcessingMode sinkPm = sinkOm.attributes == null ? null : sinkOm.attributes.get(OperatorContext.PROCESSING_MODE); if (sinkPm == null) { // If the source processing mode is AT_MOST_ONCE and a processing mode is not specified for the sink then http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java index d5045b4..471dca2 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java @@ -359,7 +359,7 @@ public class PTOperator implements java.io.Serializable if (partitionKeys != null) { pkeys = Maps.newHashMapWithExpectedSize(partitionKeys.size()); for (Map.Entry<InputPortMeta, PartitionKeys> e : partitionKeys.entrySet()) { - pkeys.put(e.getKey().getPortObject(), e.getValue()); + pkeys.put(e.getKey().getPort(), e.getValue()); } } return pkeys; http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java index 4181971..94f47e6 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java @@ -573,7 +573,7 @@ public class PhysicalPlan implements Serializable private void updatePersistOperatorWithSinkPartitions(InputPortMeta persistInputPort, OperatorMeta persistOperatorMeta, StreamCodecWrapperForPersistance<?> persistCodec, InputPortMeta sinkPortMeta) { - Collection<PTOperator> ptOperators = getOperators(sinkPortMeta.getOperatorWrapper()); + Collection<PTOperator> ptOperators = getOperators(sinkPortMeta.getOperatorMeta()); Collection<PartitionKeys> partitionKeysList = new ArrayList<>(); for (PTOperator p : ptOperators) { PartitionKeys keys = p.partitionKeys.get(sinkPortMeta); @@ -593,7 +593,7 @@ public class PhysicalPlan implements Serializable Map<InputPortMeta, StreamCodec<?>> inputStreamCodecs = new HashMap<>(); // Logging is enabled for the stream for (InputPortMeta portMeta : s.getSinksToPersist()) { - InputPort<?> port = portMeta.getPortObject(); + InputPort<?> port = portMeta.getPort(); StreamCodec<?> inputStreamCodec = (portMeta.getValue(PortContext.STREAM_CODEC) != null) ? portMeta.getValue(PortContext.STREAM_CODEC) : port.getStreamCodec(); if (inputStreamCodec != null) { boolean alreadyAdded = false; @@ -619,7 +619,7 @@ public class PhysicalPlan implements Serializable // Create Wrapper codec for Stream persistence using all unique // stream codecs // Logger should write merged or union of all input stream codecs - StreamCodec<?> specifiedCodecForLogger = (s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) != null) ? s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) : s.getPersistOperatorInputPort().getPortObject().getStreamCodec(); + StreamCodec<?> specifiedCodecForLogger = (s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) != null) ? s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) : s.getPersistOperatorInputPort().getPort().getStreamCodec(); @SuppressWarnings({ "unchecked", "rawtypes" }) StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance(inputStreamCodecs, specifiedCodecForLogger); streamMetaToCodecMap.put(s, codec); @@ -629,7 +629,7 @@ public class PhysicalPlan implements Serializable } for (java.util.Map.Entry<StreamMeta, StreamCodec<?>> entry : streamMetaToCodecMap.entrySet()) { - dag.setInputPortAttribute(entry.getKey().getPersistOperatorInputPort().getPortObject(), PortContext.STREAM_CODEC, entry.getValue()); + dag.setInputPortAttribute(entry.getKey().getPersistOperatorInputPort().getPort(), PortContext.STREAM_CODEC, entry.getValue()); } } catch (Exception e) { throw Throwables.propagate(e); @@ -1326,7 +1326,7 @@ public class PhysicalPlan implements Serializable // copy list as it is modified by recursive remove for (PTInput in : Lists.newArrayList(out.sinks)) { for (LogicalPlan.InputPortMeta im : in.logicalStream.getSinks()) { - PMapping m = this.logicalToPTOperator.get(im.getOperatorWrapper()); + PMapping m = this.logicalToPTOperator.get(im.getOperatorMeta()); if (m.parallelPartitions == operatorMapping.parallelPartitions) { // associated operator parallel partitioned removePartition(in.target, operatorMapping); @@ -1439,7 +1439,7 @@ public class PhysicalPlan implements Serializable List<InputPort<?>> inputPortList = Lists.newArrayList(); for (InputPortMeta inputPortMeta: operatorMeta.getInputStreams().keySet()) { - inputPortList.add(inputPortMeta.getPortObject()); + inputPortList.add(inputPortMeta.getPort()); } return inputPortList; @@ -1609,7 +1609,7 @@ public class PhysicalPlan implements Serializable for (PTInput in : operator.inputs) { if (in.logicalStream.getPersistOperator() != null) { for (InputPortMeta inputPort : in.logicalStream.getSinksToPersist()) { - if (inputPort.getOperatorWrapper().equals(operator.operatorMeta)) { + if (inputPort.getOperatorMeta().equals(operator.operatorMeta)) { // Redeploy the stream wide persist operator only if the current sink is being persisted persistOperators.addAll(getOperators(in.logicalStream.getPersistOperator())); break; @@ -1689,7 +1689,7 @@ public class PhysicalPlan implements Serializable { // remove incoming connections for logical stream for (InputPortMeta ipm : sm.getSinks()) { - OperatorMeta om = ipm.getOperatorWrapper(); + OperatorMeta om = ipm.getOperatorMeta(); PMapping m = this.logicalToPTOperator.get(om); if (m == null) { throw new AssertionError("Unknown operator " + om); @@ -1735,7 +1735,7 @@ public class PhysicalPlan implements Serializable */ public void connectInput(InputPortMeta ipm) { - for (Map.Entry<LogicalPlan.InputPortMeta, StreamMeta> inputEntry : ipm.getOperatorWrapper().getInputStreams().entrySet()) { + for (Map.Entry<LogicalPlan.InputPortMeta, StreamMeta> inputEntry : ipm.getOperatorMeta().getInputStreams().entrySet()) { if (inputEntry.getKey() == ipm) { // initialize outputs for existing operators for (Map.Entry<LogicalPlan.OutputPortMeta, StreamMeta> outputEntry : inputEntry.getValue().getSource().getOperatorMeta().getOutputStreams().entrySet()) { @@ -1746,7 +1746,7 @@ public class PhysicalPlan implements Serializable deployOpers.add(oper); } } - PMapping m = this.logicalToPTOperator.get(ipm.getOperatorWrapper()); + PMapping m = this.logicalToPTOperator.get(ipm.getOperatorMeta()); updateStreamMappings(m); for (PTOperator oper : m.partitions) { undeployOpers.add(oper); http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/plan/physical/PlanModifier.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PlanModifier.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PlanModifier.java index 165517d..3fbd6f9 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PlanModifier.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PlanModifier.java @@ -90,7 +90,7 @@ public class PlanModifier sm.addSink(sink); if (physicalPlan != null) { for (InputPortMeta ipm : sm.getSinks()) { - if (ipm.getPortObject() == sink) { + if (ipm.getPort() == sink) { physicalPlan.connectInput(ipm); } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java index 81d6d44..c50ed79 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java @@ -209,8 +209,8 @@ public class StreamMapping implements java.io.Serializable for (InputPortMeta ipm : streamMeta.getSinks()) { // gets called prior to all logical operators mapped // skipped for parallel partitions - those are handled elsewhere - if (!ipm.getValue(PortContext.PARTITION_PARALLEL) && plan.hasMapping(ipm.getOperatorWrapper())) { - List<PTOperator> partitions = plan.getOperators(ipm.getOperatorWrapper()); + if (!ipm.getValue(PortContext.PARTITION_PARALLEL) && plan.hasMapping(ipm.getOperatorMeta())) { + List<PTOperator> partitions = plan.getOperators(ipm.getOperatorMeta()); for (PTOperator doper : partitions) { downstreamOpers.add(new Pair<>(doper, ipm)); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java index 4ff9e51..8cb4871 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java @@ -932,7 +932,7 @@ public class StreamCodecTest PTOperator.PTOutput out = operator.getOutputs().get(0); Assert.assertEquals("unifier sinks " + operator.getName(), 1, out.sinks.size()); PTOperator.PTInput idInput = out.sinks.get(0); - LogicalPlan.OperatorMeta idMeta = StreamingContainerAgent.getIdentifyingInputPortMeta(idInput).getOperatorWrapper(); + LogicalPlan.OperatorMeta idMeta = StreamingContainerAgent.getIdentifyingInputPortMeta(idInput).getOperatorMeta(); Operator.InputPort<?> idInputPort = null; if (idMeta == n2meta) { idInputPort = node2.inport1; http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java index dbb8d34..caa1bf3 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java @@ -143,7 +143,7 @@ public class LogicalPlanConfigurationTest assertEquals("rootNode out is operator2 in", n1n2, operator1.getOutputStreams().get(operator1.getMeta(((TestGeneratorInputOperator)operator1.getOperator()).outport))); assertEquals("n1n2 source", operator1, n1n2.getSource().getOperatorMeta()); Assert.assertEquals("n1n2 targets", 1, n1n2.getSinks().size()); - Assert.assertEquals("n1n2 target", operator2, n1n2.getSinks().get(0).getOperatorWrapper()); + Assert.assertEquals("n1n2 target", operator2, n1n2.getSinks().iterator().next().getOperatorMeta()); assertEquals("stream name", "n1n2", n1n2.getName()); Assert.assertEquals("n1n2 not inline (default)", null, n1n2.getLocality()); @@ -154,7 +154,7 @@ public class LogicalPlanConfigurationTest Set<OperatorMeta> targetNodes = Sets.newHashSet(); for (LogicalPlan.InputPortMeta ip : fromNode2.getSinks()) { - targetNodes.add(ip.getOperatorWrapper()); + targetNodes.add(ip.getOperatorMeta()); } Assert.assertEquals("outputs " + fromNode2, Sets.newHashSet(operator3, operator4), targetNodes); @@ -181,7 +181,7 @@ public class LogicalPlanConfigurationTest for (StreamMeta downStream : operator.getOutputStreams().values()) { if (!downStream.getSinks().isEmpty()) { for (LogicalPlan.InputPortMeta targetNode : downStream.getSinks()) { - printTopology(targetNode.getOperatorWrapper(), tplg, level + 1); + printTopology(targetNode.getOperatorMeta(), tplg, level + 1); } } } @@ -228,7 +228,7 @@ public class LogicalPlanConfigurationTest Assert.assertEquals("input1 source", dag.getOperatorMeta("inputOperator"), input1.getSource().getOperatorMeta()); Set<OperatorMeta> targetNodes = Sets.newHashSet(); for (LogicalPlan.InputPortMeta targetPort : input1.getSinks()) { - targetNodes.add(targetPort.getOperatorWrapper()); + targetNodes.add(targetPort.getOperatorMeta()); } Assert.assertEquals("input1 target ", Sets.newHashSet(dag.getOperatorMeta("operator1"), operator3, operator4), targetNodes); @@ -296,7 +296,7 @@ public class LogicalPlanConfigurationTest Assert.assertEquals("input1 source", inputOperator, input1.getSource().getOperatorMeta()); Set<OperatorMeta> targetNodes = Sets.newHashSet(); for (LogicalPlan.InputPortMeta targetPort : input1.getSinks()) { - targetNodes.add(targetPort.getOperatorWrapper()); + targetNodes.add(targetPort.getOperatorMeta()); } Assert.assertEquals("operator attribute " + inputOperator, 64, (int)inputOperator.getValue(OperatorContext.MEMORY_MB)); Assert.assertEquals("port attribute " + inputOperator, 8, (int)input1.getSource().getValue(PortContext.UNIFIER_LIMIT)); http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java index 97a375f..5b5583a 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java @@ -459,7 +459,7 @@ public class TestModuleExpansion List<String> sinksName = new ArrayList<>(); for (LogicalPlan.InputPortMeta inputPortMeta : streamMeta.getSinks()) { - sinksName.add(inputPortMeta.getOperatorWrapper().getName()); + sinksName.add(inputPortMeta.getOperatorMeta().getName()); } Assert.assertTrue(inputOperatorName.equals(sourceName));