Repository: apex-core
Updated Branches:
  refs/heads/master 74f732a79 -> abc836ca1


APEXCORE-604 extend DAG API to get operators and streams from the DAG.


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/abc836ca
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/abc836ca
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/abc836ca

Branch: refs/heads/master
Commit: abc836ca18d63a895d431dc5acf4d3a1018b235d
Parents: 74f732a
Author: Tushar R. Gosavi <tus...@apache.org>
Authored: Tue Jan 17 12:11:12 2017 +0530
Committer: Tushar R. Gosavi <tus...@apache.org>
Committed: Thu Feb 16 14:06:38 2017 +0530

----------------------------------------------------------------------
 api/src/main/java/com/datatorrent/api/DAG.java  | 71 ++++++++++++++++++++
 .../stram/StreamingContainerAgent.java          |  2 +-
 .../stram/StreamingContainerManager.java        |  4 +-
 .../stram/codec/LogicalPlanSerializer.java      |  9 ++-
 .../stram/plan/logical/LogicalPlan.java         | 62 +++++++++++------
 .../stram/plan/physical/PTOperator.java         |  2 +-
 .../stram/plan/physical/PhysicalPlan.java       | 20 +++---
 .../stram/plan/physical/PlanModifier.java       |  2 +-
 .../stram/plan/physical/StreamMapping.java      |  4 +-
 .../com/datatorrent/stram/StreamCodecTest.java  |  2 +-
 .../logical/LogicalPlanConfigurationTest.java   | 10 +--
 .../logical/module/TestModuleExpansion.java     |  2 +-
 12 files changed, 142 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/api/src/main/java/com/datatorrent/api/DAG.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/DAG.java 
b/api/src/main/java/com/datatorrent/api/DAG.java
index b80bc93..532ff72 100644
--- a/api/src/main/java/com/datatorrent/api/DAG.java
+++ b/api/src/main/java/com/datatorrent/api/DAG.java
@@ -19,6 +19,8 @@
 package com.datatorrent.api;
 
 import java.io.Serializable;
+import java.util.Collection;
+import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceStability;
 
@@ -41,11 +43,26 @@ public interface DAG extends DAGContext, Serializable
 {
   interface InputPortMeta extends Serializable, PortContext
   {
+    /**
+     * Return port object represented by this InputPortMeta
+     * @return
+     */
+    public Operator.InputPort<?> getPort();
+
+    public <T extends OperatorMeta> T getOperatorMeta();
   }
 
   interface OutputPortMeta extends Serializable, PortContext
   {
     OperatorMeta getUnifierMeta();
+
+    /**
+     * Return port object represented by this OutputPortMeta
+     * @return
+     */
+    public Operator.OutputPort<?> getPort();
+
+    public <T extends OperatorMeta> T getOperatorMeta();
   }
 
   /**
@@ -143,6 +160,19 @@ public interface DAG extends DAGContext, Serializable
      */
     public StreamMeta persistUsing(String name, Operator persistOperator, 
Operator.InputPort<?> persistOperatorInputPort, Operator.InputPort<?> 
sinkToPersist);
 
+    /**
+     * Return source of the stream.
+     * @param <T>
+     * @return
+     */
+    public <T extends OutputPortMeta> T getSource();
+
+    /**
+     * Return all sinks connected to this stream.
+     * @param <T>
+     * @return
+     */
+    public <T extends InputPortMeta> Collection<T> getSinks();
   }
 
   /**
@@ -157,6 +187,22 @@ public interface DAG extends DAGContext, Serializable
     public InputPortMeta getMeta(Operator.InputPort<?> port);
 
     public OutputPortMeta getMeta(Operator.OutputPort<?> port);
+
+    /**
+     * Return collection of stream which are connected to this operator's
+     * input ports.
+     * @param <T>
+     * @return
+     */
+    public <K extends InputPortMeta, V extends StreamMeta> Map<K, V> 
getInputStreams();
+
+    /**
+     * Return collection of stream which are connected to this operator's
+     * output ports.
+     * @param <T>
+     * @return
+     */
+    public <K extends OutputPortMeta, V extends StreamMeta> Map<K, V> 
getOutputStreams();
   }
 
   /**
@@ -281,6 +327,31 @@ public interface DAG extends DAGContext, Serializable
   public abstract OperatorMeta getMeta(Operator operator);
 
   /**
+   * Return all operators present in the DAG.
+   * @param <T>
+   * @return
+   */
+  public <T extends OperatorMeta> Collection<T> getAllOperatorsMeta();
+
+  /**
+   * Get all input operators in the DAG. This method returns operators which 
are
+   * not connected to any upstream operator. i.e the operators which do not 
have
+   * any input ports or operators which is not connected through any input 
ports
+   * in the DAG.
+   *
+   * @param <T>
+   * @return list of {@see OperatorMeta} for root operators in the DAG.
+   */
+  public <T extends OperatorMeta> Collection<T> getRootOperatorsMeta();
+
+  /**
+   * Returns all Streams present in the DAG.
+   * @param <T>
+   * @return
+   */
+  public <T extends StreamMeta> Collection<T> getAllStreamsMeta();
+
+  /**
    * Marker interface for the Node in the DAG. Any object which can be added 
as a Node in the DAG
    * needs to implement this interface.
    */

http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java 
b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
index 1d0897d..eb7aa43 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
@@ -350,7 +350,7 @@ public class StreamingContainerAgent
       StreamCodec<?> codec = inputPortMeta.getValue(PortContext.STREAM_CODEC);
       if (codec == null) {
         // it cannot be this object that gets returned. Depending on this 
value is dangerous
-        codec = inputPortMeta.getPortObject().getStreamCodec();
+        codec = inputPortMeta.getPort().getStreamCodec();
         if (codec != null) {
           // don't create codec multiple times - it will assign a new 
identifier
           inputPortMeta.getAttributes().put(PortContext.STREAM_CODEC, codec);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java 
b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 00a406c..dfbc7d1 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -706,7 +706,7 @@ public class StreamingContainerManager implements 
PlanContext
               appDataSource.setQueryOperatorName(queryOperatorName);
               appDataSource.setQueryTopic(queryTopic);
               appDataSource.setQueryUrl(convertAppDataUrl(queryUrl));
-              List<LogicalPlan.InputPortMeta> sinks = 
entry.getValue().getSinks();
+              Collection<LogicalPlan.InputPortMeta> sinks = 
entry.getValue().getSinks();
               if (sinks.isEmpty()) {
                 LOG.warn("There is no result operator for the App Data Source 
{}.{}. Ignoring the App Data Source.", operatorMeta.getName(), 
portMeta.getPortName());
                 continue;
@@ -715,7 +715,7 @@ public class StreamingContainerManager implements 
PlanContext
                 LOG.warn("There are multiple result operators for the App Data 
Source {}.{}. Ignoring the App Data Source.", operatorMeta.getName(), 
portMeta.getPortName());
                 continue;
               }
-              OperatorMeta resultOperatorMeta = 
sinks.get(0).getOperatorWrapper();
+              OperatorMeta resultOperatorMeta = 
sinks.iterator().next().getOperatorMeta();
               if (resultOperatorMeta.getOperator() instanceof 
AppData.ConnectionInfoProvider) {
                 AppData.ConnectionInfoProvider resultOperator = 
(AppData.ConnectionInfoProvider)resultOperatorMeta.getOperator();
                 
appDataSource.setResultOperatorName(resultOperatorMeta.getName());

http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java 
b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
index 6607321..b1d1fd8 100644
--- 
a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
+++ 
b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 
 import javax.ws.rs.Produces;
@@ -203,11 +202,11 @@ public class LogicalPlanSerializer extends 
JsonSerializer<LogicalPlan>
       sourcePortDetailMap.put("portName", sourcePortName);
       streamDetailMap.put("name", streamName);
       streamDetailMap.put("source", sourcePortDetailMap);
-      List<InputPortMeta> sinks = streamMeta.getSinks();
+      Collection<InputPortMeta> sinks = streamMeta.getSinks();
       ArrayList<HashMap<String, Object>> sinkPortList = new ArrayList<>();
       for (InputPortMeta sinkPort : sinks) {
         HashMap<String, Object> sinkPortDetailMap = new HashMap<>();
-        sinkPortDetailMap.put("operatorName", 
sinkPort.getOperatorWrapper().getName());
+        sinkPortDetailMap.put("operatorName", 
sinkPort.getOperatorMeta().getName());
         sinkPortDetailMap.put("portName", sinkPort.getPortName());
         sinkPortList.add(sinkPortDetailMap);
       }
@@ -257,14 +256,14 @@ public class LogicalPlanSerializer extends 
JsonSerializer<LogicalPlan>
     for (StreamMeta streamMeta : allStreams) {
       String streamKey = LogicalPlanConfiguration.STREAM_PREFIX + 
streamMeta.getName();
       OutputPortMeta source = streamMeta.getSource();
-      List<InputPortMeta> sinks = streamMeta.getSinks();
+      Collection<InputPortMeta> sinks = streamMeta.getSinks();
       props.setProperty(streamKey + "." + 
LogicalPlanConfiguration.STREAM_SOURCE, source.getOperatorMeta().getName() + 
"." + source.getPortName());
       String sinksValue = "";
       for (InputPortMeta sink : sinks) {
         if (!sinksValue.isEmpty()) {
           sinksValue += ",";
         }
-        sinksValue += sink.getOperatorWrapper().getName() + "." + 
sink.getPortName();
+        sinksValue += sink.getOperatorMeta().getName() + "." + 
sink.getPortName();
       }
       props.setProperty(streamKey + "." + 
LogicalPlanConfiguration.STREAM_SINKS, sinksValue);
       if (streamMeta.getLocality() != null) {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java 
b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 1371ce8..e1debbb 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -242,7 +242,8 @@ public class LogicalPlan implements Serializable, DAG
     //This is null when port is not hidden
     private Class<?> classDeclaringHiddenPort;
 
-    public OperatorMeta getOperatorWrapper()
+    @Override
+    public OperatorMeta getOperatorMeta()
     {
       return operatorMeta;
     }
@@ -252,7 +253,8 @@ public class LogicalPlan implements Serializable, DAG
       return fieldName;
     }
 
-    public InputPort<?> getPortObject()
+    @Override
+    public InputPort<?> getPort()
     {
       for (Map.Entry<InputPort<?>, InputPortMeta> e : 
operatorMeta.getPortMapping().inPortMap.entrySet()) {
         if (e.getValue() == this) {
@@ -328,6 +330,7 @@ public class LogicalPlan implements Serializable, DAG
       this.attributes = new DefaultAttributeMap();
     }
 
+    @Override
     public OperatorMeta getOperatorMeta()
     {
       return operatorMeta;
@@ -363,7 +366,8 @@ public class LogicalPlan implements Serializable, DAG
       return fieldName;
     }
 
-    public OutputPort<?> getPortObject()
+    @Override
+    public OutputPort<?> getPort()
     {
       for (Map.Entry<OutputPort<?>, OutputPortMeta> e : 
operatorMeta.getPortMapping().outPortMap.entrySet()) {
         if (e.getValue() == this) {
@@ -504,7 +508,8 @@ public class LogicalPlan implements Serializable, DAG
       return this;
     }
 
-    public List<InputPortMeta> getSinks()
+    @Override
+    public Collection<InputPortMeta> getSinks()
     {
       return sinks;
     }
@@ -517,7 +522,7 @@ public class LogicalPlan implements Serializable, DAG
         return this;
       }
       InputPortMeta portMeta = assertGetPortMeta(port);
-      OperatorMeta om = portMeta.getOperatorWrapper();
+      OperatorMeta om = portMeta.getOperatorMeta();
       String portName = portMeta.getPortName();
       if (om.inputStreams.containsKey(portMeta)) {
         throw new IllegalArgumentException(String.format("Port %s already 
connected to stream %s", portName, om.inputStreams.get(portMeta)));
@@ -539,9 +544,9 @@ public class LogicalPlan implements Serializable, DAG
     public void remove()
     {
       for (InputPortMeta ipm : this.sinks) {
-        ipm.getOperatorWrapper().inputStreams.remove(ipm);
-        if (ipm.getOperatorWrapper().inputStreams.isEmpty()) {
-          rootOperators.add(ipm.getOperatorWrapper());
+        ipm.getOperatorMeta().inputStreams.remove(ipm);
+        if (ipm.getOperatorMeta().inputStreams.isEmpty()) {
+          rootOperators.add(ipm.getOperatorMeta());
         }
       }
       // Remove persist operator for at stream level if present:
@@ -697,7 +702,7 @@ public class LogicalPlan implements Serializable, DAG
 
     private void setPersistOperatorInputPort(InputPortMeta inport)
     {
-      this.addSink(inport.getPortObject());
+      this.addSink(inport.getPort());
       this.persistOperatorInputPort = inport;
     }
 
@@ -714,7 +719,7 @@ public class LogicalPlan implements Serializable, DAG
     private String getPersistOperatorName(InputPort<?> sinkToPersist)
     {
       InputPortMeta portMeta = assertGetPortMeta(sinkToPersist);
-      OperatorMeta operatorMeta = portMeta.getOperatorWrapper();
+      OperatorMeta operatorMeta = portMeta.getOperatorMeta();
       return id + "_" + operatorMeta.getName() + "_persister";
     }
 
@@ -735,7 +740,7 @@ public class LogicalPlan implements Serializable, DAG
     {
       StreamCodec<Object> inputStreamCodec = 
sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC) != null
           ? 
(StreamCodec<Object>)sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC)
-          : 
(StreamCodec<Object>)sinkToPersistPortMeta.getPortObject().getStreamCodec();
+          : 
(StreamCodec<Object>)sinkToPersistPortMeta.getPort().getStreamCodec();
       if (inputStreamCodec != null) {
         Map<InputPortMeta, StreamCodec<Object>> codecs = new HashMap<>();
         codecs.put(sinkToPersistPortMeta, inputStreamCodec);
@@ -1138,6 +1143,7 @@ public class LogicalPlan implements Serializable, DAG
       return getPortMapping().inPortMap.get(port);
     }
 
+    @Override
     public Map<InputPortMeta, StreamMeta> getInputStreams()
     {
       return this.inputStreams;
@@ -1359,7 +1365,7 @@ public class LogicalPlan implements Serializable, DAG
     Map<InputPortMeta, StreamMeta> inputStreams = om.getInputStreams();
     for (Map.Entry<InputPortMeta, StreamMeta> e : inputStreams.entrySet()) {
       StreamMeta stream = e.getValue();
-      if (e.getKey().getOperatorWrapper() == om) {
+      if (e.getKey().getOperatorMeta() == om) {
         stream.sinks.remove(e.getKey());
       }
       // If persistStream was enabled for stream, reset stream when sink 
removed
@@ -1424,12 +1430,12 @@ public class LogicalPlan implements Serializable, DAG
       OutputPortMeta sourceMeta = streamMeta.getSource();
       List<InputPort<?>> ports = new LinkedList<>();
       for (InputPortMeta inputPortMeta : streamMeta.getSinks()) {
-        ports.add(inputPortMeta.getPortObject());
+        ports.add(inputPortMeta.getPort());
       }
       InputPort[] inputPorts = ports.toArray(new InputPort[]{});
 
       name = subDAGName + MODULE_NAMESPACE_SEPARATOR + streamMeta.getName();
-      StreamMeta streamMetaNew = this.addStream(name, 
sourceMeta.getPortObject(), inputPorts);
+      StreamMeta streamMetaNew = this.addStream(name, sourceMeta.getPort(), 
inputPorts);
       streamMetaNew.setLocality(streamMeta.getLocality());
     }
   }
@@ -1531,6 +1537,12 @@ public class LogicalPlan implements Serializable, DAG
     return Collections.unmodifiableList(this.rootOperators);
   }
 
+  @Override
+  public List<OperatorMeta> getRootOperatorsMeta()
+  {
+    return getRootOperators();
+  }
+
   public List<OperatorMeta> getLeafOperators()
   {
     return Collections.unmodifiableList(this.leafOperators);
@@ -1541,6 +1553,12 @@ public class LogicalPlan implements Serializable, DAG
     return Collections.unmodifiableCollection(this.operators.values());
   }
 
+  @Override
+  public Collection<OperatorMeta> getAllOperatorsMeta()
+  {
+    return getAllOperators();
+  }
+
   public Collection<ModuleMeta> getAllModules()
   {
     return Collections.unmodifiableCollection(this.modules.values());
@@ -1552,6 +1570,12 @@ public class LogicalPlan implements Serializable, DAG
   }
 
   @Override
+  public Collection<StreamMeta> getAllStreamsMeta()
+  {
+    return getAllStreams();
+  }
+
+  @Override
   public OperatorMeta getOperatorMeta(String operatorName)
   {
     return this.operators.get(operatorName);
@@ -1618,7 +1642,7 @@ public class LogicalPlan implements Serializable, DAG
         if (streamCodec != null) {
           classNames.add(streamCodec.getClass().getName());
         } else {
-          StreamCodec<?> codec = sink.getPortObject().getStreamCodec();
+          StreamCodec<?> codec = sink.getPort().getStreamCodec();
           if (codec != null) {
             classNames.add(codec.getClass().getName());
           }
@@ -1927,7 +1951,7 @@ public class LogicalPlan implements Serializable, DAG
     for (StreamMeta stream : getAllStreams()) {
       String source = stream.source.getOperatorMeta().getName();
       for (InputPortMeta sink : stream.sinks) {
-        String sinkOperator = sink.getOperatorWrapper().getName();
+        String sinkOperator = sink.getOperatorMeta().getName();
         OperatorPair pair = new OperatorPair(source, sinkOperator);
         if (stream.getLocality() != null && stream.getLocality().ordinal() <= 
Locality.NODE_LOCAL.ordinal() && hostNamesMapping.containsKey(pair.first) && 
hostNamesMapping.containsKey(pair.second) && 
!hostNamesMapping.get(pair.first).equals(hostNamesMapping.get(pair.second))) {
           throw new ValidationException(String.format("Host Locality for 
operators: %s(host: %s) & %s(host: %s) conflicts with stream locality", 
pair.first, hostNamesMapping.get(pair.first), pair.second, 
hostNamesMapping.get(pair.second)));
@@ -2188,7 +2212,7 @@ public class LogicalPlan implements Serializable, DAG
     // depth first successors traversal
     for (StreamMeta downStream: om.outputStreams.values()) {
       for (InputPortMeta sink: downStream.sinks) {
-        OperatorMeta successor = sink.getOperatorWrapper();
+        OperatorMeta successor = sink.getOperatorMeta();
         if (successor == null) {
           continue;
         }
@@ -2256,7 +2280,7 @@ public class LogicalPlan implements Serializable, DAG
 
     for (StreamMeta downStream: om.outputStreams.values()) {
       for (InputPortMeta sink : downStream.sinks) {
-        OperatorMeta successor = sink.getOperatorWrapper();
+        OperatorMeta successor = sink.getOperatorMeta();
         if (isDelayOperator) {
           sink.attributes.put(IS_CONNECTED_TO_DELAY_OPERATOR, true);
           // Check whether all downstream operators are already visited in the 
path
@@ -2285,7 +2309,7 @@ public class LogicalPlan implements Serializable, DAG
     Operator.ProcessingMode pm = om.getValue(OperatorContext.PROCESSING_MODE);
     for (StreamMeta os : om.outputStreams.values()) {
       for (InputPortMeta sink: os.sinks) {
-        OperatorMeta sinkOm = sink.getOperatorWrapper();
+        OperatorMeta sinkOm = sink.getOperatorMeta();
         Operator.ProcessingMode sinkPm = sinkOm.attributes == null ? null : 
sinkOm.attributes.get(OperatorContext.PROCESSING_MODE);
         if (sinkPm == null) {
           // If the source processing mode is AT_MOST_ONCE and a processing 
mode is not specified for the sink then

http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java 
b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
index d5045b4..471dca2 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
@@ -359,7 +359,7 @@ public class PTOperator implements java.io.Serializable
     if (partitionKeys != null) {
       pkeys = Maps.newHashMapWithExpectedSize(partitionKeys.size());
       for (Map.Entry<InputPortMeta, PartitionKeys> e : 
partitionKeys.entrySet()) {
-        pkeys.put(e.getKey().getPortObject(), e.getValue());
+        pkeys.put(e.getKey().getPort(), e.getValue());
       }
     }
     return pkeys;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java 
b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index 4181971..94f47e6 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -573,7 +573,7 @@ public class PhysicalPlan implements Serializable
 
   private void updatePersistOperatorWithSinkPartitions(InputPortMeta 
persistInputPort, OperatorMeta persistOperatorMeta, 
StreamCodecWrapperForPersistance<?> persistCodec, InputPortMeta sinkPortMeta)
   {
-    Collection<PTOperator> ptOperators = 
getOperators(sinkPortMeta.getOperatorWrapper());
+    Collection<PTOperator> ptOperators = 
getOperators(sinkPortMeta.getOperatorMeta());
     Collection<PartitionKeys> partitionKeysList = new ArrayList<>();
     for (PTOperator p : ptOperators) {
       PartitionKeys keys = p.partitionKeys.get(sinkPortMeta);
@@ -593,7 +593,7 @@ public class PhysicalPlan implements Serializable
             Map<InputPortMeta, StreamCodec<?>> inputStreamCodecs = new 
HashMap<>();
             // Logging is enabled for the stream
             for (InputPortMeta portMeta : s.getSinksToPersist()) {
-              InputPort<?> port = portMeta.getPortObject();
+              InputPort<?> port = portMeta.getPort();
               StreamCodec<?> inputStreamCodec = 
(portMeta.getValue(PortContext.STREAM_CODEC) != null) ? 
portMeta.getValue(PortContext.STREAM_CODEC) : port.getStreamCodec();
               if (inputStreamCodec != null) {
                 boolean alreadyAdded = false;
@@ -619,7 +619,7 @@ public class PhysicalPlan implements Serializable
               // Create Wrapper codec for Stream persistence using all unique
               // stream codecs
               // Logger should write merged or union of all input stream codecs
-              StreamCodec<?> specifiedCodecForLogger = 
(s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) != null) ? 
s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) : 
s.getPersistOperatorInputPort().getPortObject().getStreamCodec();
+              StreamCodec<?> specifiedCodecForLogger = 
(s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) != null) ? 
s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) : 
s.getPersistOperatorInputPort().getPort().getStreamCodec();
               @SuppressWarnings({ "unchecked", "rawtypes" })
               StreamCodecWrapperForPersistance<Object> codec = new 
StreamCodecWrapperForPersistance(inputStreamCodecs, specifiedCodecForLogger);
               streamMetaToCodecMap.put(s, codec);
@@ -629,7 +629,7 @@ public class PhysicalPlan implements Serializable
       }
 
       for (java.util.Map.Entry<StreamMeta, StreamCodec<?>> entry : 
streamMetaToCodecMap.entrySet()) {
-        
dag.setInputPortAttribute(entry.getKey().getPersistOperatorInputPort().getPortObject(),
 PortContext.STREAM_CODEC, entry.getValue());
+        
dag.setInputPortAttribute(entry.getKey().getPersistOperatorInputPort().getPort(),
 PortContext.STREAM_CODEC, entry.getValue());
       }
     } catch (Exception e) {
       throw Throwables.propagate(e);
@@ -1326,7 +1326,7 @@ public class PhysicalPlan implements Serializable
       // copy list as it is modified by recursive remove
       for (PTInput in : Lists.newArrayList(out.sinks)) {
         for (LogicalPlan.InputPortMeta im : in.logicalStream.getSinks()) {
-          PMapping m = this.logicalToPTOperator.get(im.getOperatorWrapper());
+          PMapping m = this.logicalToPTOperator.get(im.getOperatorMeta());
           if (m.parallelPartitions == operatorMapping.parallelPartitions) {
             // associated operator parallel partitioned
             removePartition(in.target, operatorMapping);
@@ -1439,7 +1439,7 @@ public class PhysicalPlan implements Serializable
     List<InputPort<?>> inputPortList = Lists.newArrayList();
 
     for (InputPortMeta inputPortMeta: operatorMeta.getInputStreams().keySet()) 
{
-      inputPortList.add(inputPortMeta.getPortObject());
+      inputPortList.add(inputPortMeta.getPort());
     }
 
     return inputPortList;
@@ -1609,7 +1609,7 @@ public class PhysicalPlan implements Serializable
         for (PTInput in : operator.inputs) {
           if (in.logicalStream.getPersistOperator() != null) {
             for (InputPortMeta inputPort : 
in.logicalStream.getSinksToPersist()) {
-              if 
(inputPort.getOperatorWrapper().equals(operator.operatorMeta)) {
+              if (inputPort.getOperatorMeta().equals(operator.operatorMeta)) {
                 // Redeploy the stream wide persist operator only if the 
current sink is being persisted
                 
persistOperators.addAll(getOperators(in.logicalStream.getPersistOperator()));
                 break;
@@ -1689,7 +1689,7 @@ public class PhysicalPlan implements Serializable
   {
     // remove incoming connections for logical stream
     for (InputPortMeta ipm : sm.getSinks()) {
-      OperatorMeta om = ipm.getOperatorWrapper();
+      OperatorMeta om = ipm.getOperatorMeta();
       PMapping m = this.logicalToPTOperator.get(om);
       if (m == null) {
         throw new AssertionError("Unknown operator " + om);
@@ -1735,7 +1735,7 @@ public class PhysicalPlan implements Serializable
    */
   public void connectInput(InputPortMeta ipm)
   {
-    for (Map.Entry<LogicalPlan.InputPortMeta, StreamMeta> inputEntry : 
ipm.getOperatorWrapper().getInputStreams().entrySet()) {
+    for (Map.Entry<LogicalPlan.InputPortMeta, StreamMeta> inputEntry : 
ipm.getOperatorMeta().getInputStreams().entrySet()) {
       if (inputEntry.getKey() == ipm) {
         // initialize outputs for existing operators
         for (Map.Entry<LogicalPlan.OutputPortMeta, StreamMeta> outputEntry : 
inputEntry.getValue().getSource().getOperatorMeta().getOutputStreams().entrySet())
 {
@@ -1746,7 +1746,7 @@ public class PhysicalPlan implements Serializable
             deployOpers.add(oper);
           }
         }
-        PMapping m = this.logicalToPTOperator.get(ipm.getOperatorWrapper());
+        PMapping m = this.logicalToPTOperator.get(ipm.getOperatorMeta());
         updateStreamMappings(m);
         for (PTOperator oper : m.partitions) {
           undeployOpers.add(oper);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/plan/physical/PlanModifier.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/physical/PlanModifier.java 
b/engine/src/main/java/com/datatorrent/stram/plan/physical/PlanModifier.java
index 165517d..3fbd6f9 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PlanModifier.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PlanModifier.java
@@ -90,7 +90,7 @@ public class PlanModifier
       sm.addSink(sink);
       if (physicalPlan != null) {
         for (InputPortMeta ipm : sm.getSinks()) {
-          if (ipm.getPortObject() == sink) {
+          if (ipm.getPort() == sink) {
             physicalPlan.connectInput(ipm);
           }
         }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java 
b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
index 81d6d44..c50ed79 100644
--- 
a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
+++ 
b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
@@ -209,8 +209,8 @@ public class StreamMapping implements java.io.Serializable
     for (InputPortMeta ipm : streamMeta.getSinks()) {
       // gets called prior to all logical operators mapped
       // skipped for parallel partitions - those are handled elsewhere
-      if (!ipm.getValue(PortContext.PARTITION_PARALLEL) && 
plan.hasMapping(ipm.getOperatorWrapper())) {
-        List<PTOperator> partitions = 
plan.getOperators(ipm.getOperatorWrapper());
+      if (!ipm.getValue(PortContext.PARTITION_PARALLEL) && 
plan.hasMapping(ipm.getOperatorMeta())) {
+        List<PTOperator> partitions = plan.getOperators(ipm.getOperatorMeta());
         for (PTOperator doper : partitions) {
           downstreamOpers.add(new Pair<>(doper, ipm));
         }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java 
b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
index 4ff9e51..8cb4871 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
@@ -932,7 +932,7 @@ public class StreamCodecTest
           PTOperator.PTOutput out = operator.getOutputs().get(0);
           Assert.assertEquals("unifier sinks " + operator.getName(), 1, 
out.sinks.size());
           PTOperator.PTInput idInput = out.sinks.get(0);
-          LogicalPlan.OperatorMeta idMeta = 
StreamingContainerAgent.getIdentifyingInputPortMeta(idInput).getOperatorWrapper();
+          LogicalPlan.OperatorMeta idMeta = 
StreamingContainerAgent.getIdentifyingInputPortMeta(idInput).getOperatorMeta();
           Operator.InputPort<?> idInputPort = null;
           if (idMeta == n2meta) {
             idInputPort = node2.inport1;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
index dbb8d34..caa1bf3 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
@@ -143,7 +143,7 @@ public class LogicalPlanConfigurationTest
     assertEquals("rootNode out is operator2 in", n1n2, 
operator1.getOutputStreams().get(operator1.getMeta(((TestGeneratorInputOperator)operator1.getOperator()).outport)));
     assertEquals("n1n2 source", operator1, n1n2.getSource().getOperatorMeta());
     Assert.assertEquals("n1n2 targets", 1, n1n2.getSinks().size());
-    Assert.assertEquals("n1n2 target", operator2, 
n1n2.getSinks().get(0).getOperatorWrapper());
+    Assert.assertEquals("n1n2 target", operator2, 
n1n2.getSinks().iterator().next().getOperatorMeta());
 
     assertEquals("stream name", "n1n2", n1n2.getName());
     Assert.assertEquals("n1n2 not inline (default)", null, n1n2.getLocality());
@@ -154,7 +154,7 @@ public class LogicalPlanConfigurationTest
 
     Set<OperatorMeta> targetNodes = Sets.newHashSet();
     for (LogicalPlan.InputPortMeta ip : fromNode2.getSinks()) {
-      targetNodes.add(ip.getOperatorWrapper());
+      targetNodes.add(ip.getOperatorMeta());
     }
     Assert.assertEquals("outputs " + fromNode2, Sets.newHashSet(operator3, 
operator4), targetNodes);
 
@@ -181,7 +181,7 @@ public class LogicalPlanConfigurationTest
     for (StreamMeta downStream : operator.getOutputStreams().values()) {
       if (!downStream.getSinks().isEmpty()) {
         for (LogicalPlan.InputPortMeta targetNode : downStream.getSinks()) {
-          printTopology(targetNode.getOperatorWrapper(), tplg, level + 1);
+          printTopology(targetNode.getOperatorMeta(), tplg, level + 1);
         }
       }
     }
@@ -228,7 +228,7 @@ public class LogicalPlanConfigurationTest
     Assert.assertEquals("input1 source", dag.getOperatorMeta("inputOperator"), 
input1.getSource().getOperatorMeta());
     Set<OperatorMeta> targetNodes = Sets.newHashSet();
     for (LogicalPlan.InputPortMeta targetPort : input1.getSinks()) {
-      targetNodes.add(targetPort.getOperatorWrapper());
+      targetNodes.add(targetPort.getOperatorMeta());
     }
 
     Assert.assertEquals("input1 target ", 
Sets.newHashSet(dag.getOperatorMeta("operator1"), operator3, operator4), 
targetNodes);
@@ -296,7 +296,7 @@ public class LogicalPlanConfigurationTest
     Assert.assertEquals("input1 source", inputOperator, 
input1.getSource().getOperatorMeta());
     Set<OperatorMeta> targetNodes = Sets.newHashSet();
     for (LogicalPlan.InputPortMeta targetPort : input1.getSinks()) {
-      targetNodes.add(targetPort.getOperatorWrapper());
+      targetNodes.add(targetPort.getOperatorMeta());
     }
     Assert.assertEquals("operator attribute " + inputOperator, 64, 
(int)inputOperator.getValue(OperatorContext.MEMORY_MB));
     Assert.assertEquals("port attribute " + inputOperator, 8, 
(int)input1.getSource().getValue(PortContext.UNIFIER_LIMIT));

http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
index 97a375f..5b5583a 100644
--- 
a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
+++ 
b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
@@ -459,7 +459,7 @@ public class TestModuleExpansion
 
     List<String> sinksName = new ArrayList<>();
     for (LogicalPlan.InputPortMeta inputPortMeta : streamMeta.getSinks()) {
-      sinksName.add(inputPortMeta.getOperatorWrapper().getName());
+      sinksName.add(inputPortMeta.getOperatorMeta().getName());
     }
 
     Assert.assertTrue(inputOperatorName.equals(sourceName));

Reply via email to