Taewoo Kim has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1196

Change subject: Index-only plan step 2: Added SplitOperator
......................................................................

Index-only plan step 2: Added SplitOperator

 - Introduced SplitOperator that sends a tuple to only one output frame unlike 
the ReplicateOperator
   that propagates a tuple into all outputs frames.

Change-Id: Ice190827513cd8632764b52c9d0338d65c830740
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
A 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
A 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
A 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractReplicatePOperator.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
A 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
M 
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
M 
hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
A 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
A 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java
33 files changed, 802 insertions(+), 246 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/96/1196/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
index e04be6f..c5d379e 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
@@ -44,12 +44,12 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -57,6 +57,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -210,6 +211,11 @@
         }
 
         @Override
+        public Void visitSplitOperator(SplitOperator op, Void arg) throws 
AlgebricksException {
+            return null;
+        }
+
+        @Override
         public Void visitMaterializeOperator(MaterializeOperator op, Void arg) 
throws AlgebricksException {
             return null;
         }
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
index 143cf0b..f4210e9 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
@@ -26,8 +26,8 @@
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.om.base.AString;
@@ -67,18 +67,19 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
@@ -519,6 +520,11 @@
     }
 
     @Override
+    public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg) 
throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
     public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, 
Void arg) throws AlgebricksException {
         return visitSingleInputOperator(op);
     }
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
index c7b927e..4b2f5a8 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
@@ -59,6 +59,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -287,6 +288,11 @@
     }
 
     @Override
+    public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg) 
throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
     public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, 
Void arg) throws AlgebricksException {
         return visitSingleInputOperator(op);
     }
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
index ef0f9da..8147148 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
@@ -33,17 +33,18 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -163,6 +164,11 @@
     }
 
     @Override
+    public Boolean visitSplitOperator(SplitOperator op, Void arg) throws 
AlgebricksException {
+        return visitInputs(op);
+    }
+
+    @Override
     public Boolean visitMaterializeOperator(MaterializeOperator op, Void arg) 
throws AlgebricksException {
         return visitInputs(op);
     }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index d39a8c8..c12ab60 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -45,6 +45,7 @@
     SCRIPT,
     SELECT,
     SINK,
+    SPLIT,
     SUBPLAN,
     TOKENIZE,
     UNIONALL,
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 7d5cf16..de6852e 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -53,6 +53,7 @@
     RANDOM_MERGE_EXCHANGE,
     RANGE_PARTITION_EXCHANGE,
     RANGE_PARTITION_MERGE_EXCHANGE,
+    REPLICATE,
     RTREE_SEARCH,
     RUNNING_AGGREGATE,
     SINGLE_PARTITION_INVERTED_INDEX_SEARCH,
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
new file mode 100644
index 0000000..adf6945
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+
+/**
+ * Abstract class for two replication related operator - replicate and split
+ * Replicate operator propagates all frames to all output branches.
+ * That is, each tuple will be propagated to all output branches.
+ * Split operator propagates each tuple in a frame to one output branch only.
+ */
+public abstract class AbstractReplicateOperator extends 
AbstractLogicalOperator {
+
+    private int outputArity;
+    private boolean[] outputMaterializationFlags;
+    private List<Mutable<ILogicalOperator>> outputs;
+
+    public AbstractReplicateOperator(int outputArity) {
+        this.outputArity = outputArity;
+        this.outputMaterializationFlags = new boolean[outputArity];
+        this.outputs = new ArrayList<>();
+    }
+
+    public AbstractReplicateOperator(int outputArity, boolean[] 
outputMaterializationFlags) {
+        this.outputArity = outputArity;
+        this.outputMaterializationFlags = outputMaterializationFlags;
+        this.outputs = new ArrayList<>();
+    }
+
+    @Override
+    public boolean 
acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+            throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public VariablePropagationPolicy getVariablePropagationPolicy() {
+        return VariablePropagationPolicy.ALL;
+    }
+
+    @Override
+    public boolean isMap() {
+        return true;
+    }
+
+    @Override
+    public void recomputeSchema() {
+        schema = new 
ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema());
+    }
+
+    public void substituteVar(LogicalVariable v1, LogicalVariable v2) {
+        // do nothing
+    }
+
+    public int getOutputArity() {
+        return outputArity;
+    }
+
+    public void setOutputMaterializationFlags(boolean[] 
outputMaterializationFlags) {
+        this.outputMaterializationFlags = outputMaterializationFlags;
+    }
+
+    public boolean[] getOutputMaterializationFlags() {
+        return outputMaterializationFlags;
+    }
+
+    public List<Mutable<ILogicalOperator>> getOutputs() {
+        return outputs;
+    }
+
+    @Override
+    public IVariableTypeEnvironment 
computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+        return createPropagatingAllInputsTypeEnvironment(ctx);
+    }
+
+    public boolean isBlocker() {
+        for (boolean requiresMaterialization : outputMaterializationFlags) {
+            if (requiresMaterialization) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+}
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
index 834107c..3c1b8b2 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
@@ -18,36 +18,18 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.logical;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
-import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
-import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
-public class ReplicateOperator extends AbstractLogicalOperator {
-
-    private int outputArity;
-    private boolean[] outputMaterializationFlags;
-    private List<Mutable<ILogicalOperator>> outputs;
+public class ReplicateOperator extends AbstractReplicateOperator {
 
     public ReplicateOperator(int outputArity) {
-        this.outputArity = outputArity;
-        this.outputMaterializationFlags = new boolean[outputArity];
-        this.outputs = new ArrayList<>();
+        super(outputArity);
     }
 
     public ReplicateOperator(int outputArity, boolean[] 
outputMaterializationFlags) {
-        this.outputArity = outputArity;
-        this.outputMaterializationFlags = outputMaterializationFlags;
-        this.outputs = new ArrayList<>();
+        super(outputArity, outputMaterializationFlags);
     }
 
     @Override
@@ -58,60 +40,5 @@
     @Override
     public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) 
throws AlgebricksException {
         return visitor.visitReplicateOperator(this, arg);
-    }
-
-    @Override
-    public boolean 
acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
-            throws AlgebricksException {
-        return false;
-    }
-
-    @Override
-    public VariablePropagationPolicy getVariablePropagationPolicy() {
-        return VariablePropagationPolicy.ALL;
-    }
-
-    @Override
-    public boolean isMap() {
-        return true;
-    }
-
-    @Override
-    public void recomputeSchema() {
-        schema = new 
ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema());
-    }
-
-    public void substituteVar(LogicalVariable v1, LogicalVariable v2) {
-        // do nothing
-    }
-
-    public int getOutputArity() {
-        return outputArity;
-    }
-
-    public void setOutputMaterializationFlags(boolean[] 
outputMaterializationFlags) {
-        this.outputMaterializationFlags = outputMaterializationFlags;
-    }
-
-    public boolean[] getOutputMaterializationFlags() {
-        return outputMaterializationFlags;
-    }
-
-    public List<Mutable<ILogicalOperator>> getOutputs() {
-        return outputs;
-    }
-
-    @Override
-    public IVariableTypeEnvironment 
computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
-        return createPropagatingAllInputsTypeEnvironment(ctx);
-    }
-
-    public boolean isBlocker() {
-        for (boolean requiresMaterialization : outputMaterializationFlags) {
-            if (requiresMaterialization) {
-                return true;
-            }
-        }
-        return false;
     }
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
new file mode 100644
index 0000000..90a358f
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+/**
+ * Split Operator receives a variable as conditional variable.
+ * Based on its value, it propagates each tuple to the corresponding output 
frame. (e.g., first output = 0, ...)
+ * Thus, unlike Replicate operator that does unconditional propagation to all 
outputs,
+ * this does a conditional propagate operation.
+ *
+ * @author waans11
+ */
+public class SplitOperator extends AbstractReplicateOperator {
+
+    // variable that keeps the output branch information for each tuple
+    private LogicalVariable conditionVar;
+
+    public SplitOperator(int outputArity, LogicalVariable conditionVar) {
+        super(outputArity);
+        this.conditionVar = conditionVar;
+    }
+
+    @Override
+    public LogicalOperatorTag getOperatorTag() {
+        return LogicalOperatorTag.SPLIT;
+    }
+
+    @Override
+    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) 
throws AlgebricksException {
+        return visitor.visitSplitOperator(this, arg);
+    }
+
+    public LogicalVariable getConditionVar() {
+        return conditionVar;
+    }
+
+    // Get the field position of the conditional variable.
+    public int getConditionVarFieldPos() throws AlgebricksException {
+        if (schema != null) {
+            return schema.indexOf(conditionVar);
+        } else {
+            throw new AlgebricksException("SPLIT operator: can't get the 
position of conditional variable.");
+        }
+    }
+}
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
index b999493..7b19f08 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
@@ -55,6 +55,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -185,6 +186,11 @@
     }
 
     @Override
+    public Long visitSplitOperator(SplitOperator op, Void arg) throws 
AlgebricksException {
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
     public Long visitMaterializeOperator(MaterializeOperator op, Void arg) 
throws AlgebricksException {
         return op.getInputs().get(0).getValue().accept(this, arg);
     }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 5e2a041..4370fc4 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -74,6 +74,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -423,6 +424,12 @@
     }
 
     @Override
+    public Void visitSplitOperator(SplitOperator op, IOptimizationContext ctx) 
throws AlgebricksException {
+        propagateFDsAndEquivClasses(op, ctx);
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, 
IOptimizationContext ctx) throws AlgebricksException {
         propagateFDsAndEquivClasses(op, ctx);
         return null;
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index c0e8b34..95110eb 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -62,6 +62,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -298,6 +299,14 @@
     }
 
     @Override
+    public Boolean visitSplitOperator(SplitOperator op, ILogicalOperator arg) 
throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.SPLIT)
+            return Boolean.FALSE;
+        return Boolean.TRUE;
+    }
+
+    @Override
     public Boolean visitMaterializeOperator(MaterializeOperator op, 
ILogicalOperator arg) throws AlgebricksException {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
         if (aop.getOperatorTag() != LogicalOperatorTag.MATERIALIZE) {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 965008c..7fa98a3 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -22,8 +22,8 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -64,6 +64,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -178,6 +179,12 @@
     }
 
     @Override
+    public Void visitSplitOperator(SplitOperator op, ILogicalOperator arg) 
throws AlgebricksException {
+        mapVariablesStandard(op, arg);
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, 
ILogicalOperator arg) throws AlgebricksException {
         mapVariablesStandard(op, arg);
         return null;
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 4241d84..fd23ed4 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -52,18 +52,19 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
@@ -410,6 +411,13 @@
     }
 
     @Override
+    public ILogicalOperator visitSplitOperator(SplitOperator op, 
ILogicalOperator arg) throws AlgebricksException {
+        SplitOperator opCopy = new SplitOperator(op.getOutputArity(), 
op.getConditionVar());
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
     public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, 
ILogicalOperator arg)
             throws AlgebricksException {
         MaterializeOperator opCopy = new MaterializeOperator();
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 8d3644d..995a4cb 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -54,6 +54,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -193,6 +194,11 @@
     }
 
     @Override
+    public Void visitSplitOperator(SplitOperator op, IOptimizationContext arg) 
throws AlgebricksException {
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, 
IOptimizationContext arg) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index fde6a28..07a3e63 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -52,6 +52,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -59,6 +60,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -66,7 +68,6 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import 
org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
@@ -180,6 +181,11 @@
     }
 
     @Override
+    public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg) 
throws AlgebricksException {
+        return new SplitOperator(op.getOutputArity(), op.getConditionVar());
+    }
+
+    @Override
     public ILogicalOperator visitScriptOperator(ScriptOperator op, Void arg) 
throws AlgebricksException {
         ArrayList<LogicalVariable> newInputList = new ArrayList<>();
         ArrayList<LogicalVariable> newOutputList = new ArrayList<>();
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
index f01b20f..4575fa2 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
@@ -56,6 +56,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -185,6 +186,11 @@
     }
 
     @Override
+    public Void visitSplitOperator(SplitOperator op, IOptimizationContext arg) 
throws AlgebricksException {
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, 
IOptimizationContext ctx) throws AlgebricksException {
         return null;
     }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 10659b1..fe38f13 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -48,11 +48,11 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -60,6 +60,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -255,6 +256,11 @@
     }
 
     @Override
+    public Void visitSplitOperator(SplitOperator op, Void arg) throws 
AlgebricksException {
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, Void arg) 
throws AlgebricksException {
         return null;
     }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index d35153a..9cc56ef 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -46,11 +46,11 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -58,6 +58,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -288,6 +289,12 @@
     }
 
     @Override
+    public Void visitSplitOperator(SplitOperator op, Void arg) throws 
AlgebricksException {
+        standardLayout(op);
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, Void arg) 
throws AlgebricksException {
         standardLayout(op);
         return null;
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index f2da7c4..3b75746 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -46,12 +46,12 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -59,6 +59,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -414,6 +415,13 @@
     }
 
     @Override
+    public Void visitSplitOperator(SplitOperator op, Pair<LogicalVariable, 
LogicalVariable> arg)
+            throws AlgebricksException {
+        op.substituteVar(arg.first, arg.second);
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, 
Pair<LogicalVariable, LogicalVariable> arg)
             throws AlgebricksException {
         return null;
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 0d50dc2..ab6b01f 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -59,6 +59,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -443,6 +444,15 @@
     }
 
     @Override
+    public Void visitSplitOperator(SplitOperator op, Void arg) throws 
AlgebricksException {
+        for (Mutable<ILogicalOperator> outputOp : op.getOutputs()) {
+            VariableUtilities.getUsedVariables(outputOp.getValue(), 
usedVariables);
+        }
+        usedVariables.add(op.getConditionVar());
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, Void arg) 
throws AlgebricksException {
         return null;
     }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractReplicatePOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractReplicatePOperator.java
new file mode 100644
index 0000000..1d13163
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractReplicatePOperator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractReplicateOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+
+public abstract class AbstractReplicatePOperator extends 
AbstractPhysicalOperator {
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public PhysicalRequirements 
getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext 
context) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, 
IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) 
op.getInputs().get(0).getValue();
+        deliveredProperties = (StructuralPropertiesVector) 
op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator 
op) {
+        int[] inputDependencyLabels = new int[] { 0 };
+        AbstractReplicateOperator rop = (AbstractReplicateOperator) op;
+        int[] outputDependencyLabels = new int[rop.getOutputArity()];
+        // change the labels of outputs that requires materialization to 1
+        boolean[] outputMaterializationFlags = 
rop.getOutputMaterializationFlags();
+        for (int i = 0; i < rop.getOutputArity(); i++) {
+            if (outputMaterializationFlags[i]) {
+                outputDependencyLabels[i] = 1;
+            }
+        }
+        return new Pair<>(inputDependencyLabels, outputDependencyLabels);
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
index db778f7..74739da 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
@@ -19,45 +19,23 @@
 package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor;
 
-public class ReplicatePOperator extends AbstractPhysicalOperator {
+public class ReplicatePOperator extends AbstractReplicatePOperator {
 
     @Override
     public PhysicalOperatorTag getOperatorTag() {
-        return PhysicalOperatorTag.SPLIT;
-    }
-
-    @Override
-    public boolean isMicroOperator() {
-        return false;
-    }
-
-    @Override
-    public PhysicalRequirements 
getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext 
context) {
-        return emptyUnaryRequirements();
-    }
-
-    @Override
-    public void computeDeliveredProperties(ILogicalOperator op, 
IOptimizationContext context) {
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) 
op.getInputs().get(0).getValue();
-        deliveredProperties = (StructuralPropertiesVector) 
op2.getDeliveredPhysicalProperties().clone();
+        return PhysicalOperatorTag.REPLICATE;
     }
 
     @Override
@@ -65,31 +43,18 @@
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
-        RecordDescriptor recDescriptor = 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), 
propagatedSchema, context);
+        RecordDescriptor recDescriptor = 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+                propagatedSchema, context);
 
         ReplicateOperator rop = (ReplicateOperator) op;
         int outputArity = rop.getOutputArity();
         boolean[] outputMaterializationFlags = 
rop.getOutputMaterializationFlags();
 
-        SplitOperatorDescriptor splitOpDesc = new 
SplitOperatorDescriptor(spec, recDescriptor, outputArity, 
outputMaterializationFlags);
+        ReplicateOperatorDescriptor splitOpDesc = new 
ReplicateOperatorDescriptor(spec, recDescriptor, outputArity,
+                outputMaterializationFlags);
         contributeOpDesc(builder, (AbstractLogicalOperator) op, splitOpDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
-    }
-
-    @Override
-    public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator 
op) {
-        int[] inputDependencyLabels = new int[] { 0 };
-        ReplicateOperator rop = (ReplicateOperator) op;
-        int[] outputDependencyLabels = new int[rop.getOutputArity()];
-        // change the labels of outputs that requires materialization to 1
-        boolean[] outputMaterializationFlags = 
rop.getOutputMaterializationFlags();
-        for (int i = 0; i < rop.getOutputArity(); i++) {
-            if (outputMaterializationFlags[i]) {
-                outputDependencyLabels[i] = 1;
-            }
-        }
-        return new Pair<>(inputDependencyLabels, outputDependencyLabels);
     }
 
     @Override
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
new file mode 100644
index 0000000..7ad8c52
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+
+public class SplitPOperator extends AbstractReplicatePOperator {
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.SPLIT;
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+        RecordDescriptor recDescriptor = 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+                propagatedSchema, context);
+
+        // To check the result value in the conditional variable field
+        IBinaryIntegerInspectorFactory intInsepctorFactory = 
context.getBinaryIntegerInspectorFactory();
+
+        SplitOperator sop = (SplitOperator) op;
+        int outputArity = sop.getOutputArity();
+
+        int pos = inputSchemas[0].findVariable(sop.getConditionVar());
+        if (pos < 0) {
+            throw new AlgebricksException("SPLIT operator: could not find the 
variable " + sop.getConditionVar() + ".");
+        }
+
+        int conditionalVarFieldPos = pos;
+
+        SplitOperatorDescriptor splitOpDesc = null;
+        splitOpDesc = new SplitOperatorDescriptor(spec, recDescriptor, 
outputArity, intInsepctorFactory,
+                conditionalVarFieldPos);
+
+        contributeOpDesc(builder, (AbstractLogicalOperator) op, splitOpDesc);
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, op, 0);
+    }
+}
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 7e83880..472dec7 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -41,6 +41,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
@@ -56,6 +57,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -63,7 +65,6 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
 import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
@@ -363,6 +364,13 @@
     }
 
     @Override
+    public Void visitSplitOperator(SplitOperator op, Integer indent) throws 
AlgebricksException {
+        LogicalVariable conditionalSplitVar = op.getConditionVar();
+        addIndent(indent).append("split [" + conditionalSplitVar + "]");
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, Integer 
indent) throws AlgebricksException {
         addIndent(indent).append("materialize");
         return null;
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index c0f9718..8fafabf 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -34,11 +34,11 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -46,6 +46,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -86,6 +87,8 @@
 
     public R visitReplicateOperator(ReplicateOperator op, T arg) throws 
AlgebricksException;
 
+    public R visitSplitOperator(SplitOperator op, T arg) throws 
AlgebricksException;
+
     public R visitMaterializeOperator(MaterializeOperator op, T arg) throws 
AlgebricksException;
 
     public R visitScriptOperator(ScriptOperator op, T arg) throws 
AlgebricksException;
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
index 39cac06..1644d2b 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
@@ -54,6 +54,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -185,6 +186,11 @@
     }
 
     @Override
+    public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg) 
throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
     public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, 
Void arg) throws AlgebricksException {
         return visit(op);
     }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
 
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index 020cffe..4e3a0e4 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -83,7 +83,7 @@
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
 import 
org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -596,7 +596,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp,
                 new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
 
-        SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, 
stringRec, outputArity);
+        ReplicateOperatorDescriptor splitOp = new 
ReplicateOperatorDescriptor(spec, stringRec, outputArity);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, splitOp,
                 new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
index 6cfd941..6c6e5e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
@@ -71,5 +71,10 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+       <groupId>org.apache.hyracks</groupId>
+       <artifactId>algebricks-data</artifactId>
+       <version>0.2.18-SNAPSHOT</version>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
new file mode 100644
index 0000000..181e2ec
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.base;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.TaskId;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.std.misc.MaterializerTaskState;
+
+/**
+ * Abstract class for two replication related operator descriptor - replicate 
and split
+ * Replicate operator propagates all frames to all output branches.
+ * That is, each tuple will be propagated to all output branches.
+ * Split operator propagates each tuple in a frame to one output branch only.
+ */
+public abstract class AbstractReplicateOperatorDescriptor extends 
AbstractOperatorDescriptor {
+    protected static final long serialVersionUID = 1L;
+
+    protected final static int SPLITTER_MATERIALIZER_ACTIVITY_ID = 0;
+    protected final static int MATERIALIZE_READER_ACTIVITY_ID = 1;
+
+    protected final boolean[] outputMaterializationFlags;
+    protected final boolean requiresMaterialization;
+    protected final int numberOfNonMaterializedOutputs;
+    protected final int numberOfMaterializedOutputs;
+
+    public AbstractReplicateOperatorDescriptor(IOperatorDescriptorRegistry 
spec, RecordDescriptor rDesc,
+            int outputArity) {
+        this(spec, rDesc, outputArity, new boolean[outputArity]);
+    }
+
+    public AbstractReplicateOperatorDescriptor(IOperatorDescriptorRegistry 
spec, RecordDescriptor rDesc,
+            int outputArity, boolean[] outputMaterializationFlags) {
+        super(spec, 1, outputArity);
+        for (int i = 0; i < outputArity; i++) {
+            recordDescriptors[i] = rDesc;
+        }
+        this.outputMaterializationFlags = outputMaterializationFlags;
+
+        boolean reqMaterialization = false;
+        int matOutputs = 0;
+        int nonMatOutputs = 0;
+        for (boolean flag : outputMaterializationFlags) {
+            if (flag) {
+                reqMaterialization = true;
+                matOutputs++;
+            } else {
+                nonMatOutputs++;
+            }
+        }
+
+        this.requiresMaterialization = reqMaterialization;
+        this.numberOfMaterializedOutputs = matOutputs;
+        this.numberOfNonMaterializedOutputs = nonMatOutputs;
+
+    }
+
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        ReplicatorMaterializerActivityNode sma = new 
ReplicatorMaterializerActivityNode(
+                new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
+        builder.addActivity(this, sma);
+        builder.addSourceEdge(0, sma, 0);
+        int pipelineOutputIndex = 0;
+        int activityId = MATERIALIZE_READER_ACTIVITY_ID;
+        for (int i = 0; i < outputArity; i++) {
+            if (outputMaterializationFlags[i]) {
+                MaterializeReaderActivityNode mra = new 
MaterializeReaderActivityNode(
+                        new ActivityId(odId, activityId++));
+                builder.addActivity(this, mra);
+                builder.addBlockingEdge(sma, mra);
+                builder.addTargetEdge(i, mra, 0);
+            } else {
+                builder.addTargetEdge(i, sma, pipelineOutputIndex++);
+            }
+        }
+    }
+
+    protected class ReplicatorMaterializerActivityNode extends 
AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public ReplicatorMaterializerActivityNode(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final 
IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, final int 
partition, int nPartitions) {
+            return new AbstractUnaryInputOperatorNodePushable() {
+                private MaterializerTaskState state;
+                private final IFrameWriter[] writers = new 
IFrameWriter[numberOfNonMaterializedOutputs];
+                private final boolean[] isOpen = new 
boolean[numberOfNonMaterializedOutputs];
+
+                @Override
+                public void open() throws HyracksDataException {
+                    if (requiresMaterialization) {
+                        state = new 
MaterializerTaskState(ctx.getJobletContext().getJobId(),
+                                new TaskId(getActivityId(), partition), 
numberOfMaterializedOutputs);
+                        state.open(ctx);
+                    }
+                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                        isOpen[i] = true;
+                        writers[i].open();
+                    }
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer bufferAccessor) throws 
HyracksDataException {
+                    if (requiresMaterialization) {
+                        state.appendFrame(bufferAccessor);
+                        bufferAccessor.clear();
+                    }
+                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                        FrameUtils.flushFrame(bufferAccessor, writers[i]);
+                    }
+                }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                        writers[i].flush();
+                    }
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    HyracksDataException hde = null;
+                    try {
+                        if (requiresMaterialization) {
+                            state.close();
+                            ctx.setStateObject(state);
+                        }
+                    } finally {
+                        for (int i = 0; i < numberOfNonMaterializedOutputs; 
i++) {
+                            if (isOpen[i]) {
+                                try {
+                                    writers[i].close();
+                                } catch (Throwable th) {
+                                    if (hde == null) {
+                                        hde = new HyracksDataException(th);
+                                    } else {
+                                        hde.addSuppressed(th);
+                                    }
+                                }
+                            }
+                        }
+                    }
+                    if (hde != null) {
+                        throw hde;
+                    }
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                    HyracksDataException hde = null;
+                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                        if (isOpen[i]) {
+                            try {
+                                writers[i].fail();
+                            } catch (Throwable th) {
+                                if (hde == null) {
+                                    hde = new HyracksDataException(th);
+                                } else {
+                                    hde.addSuppressed(th);
+                                }
+                            }
+                        }
+                    }
+                    if (hde != null) {
+                        throw hde;
+                    }
+                }
+
+                @Override
+                public void setOutputFrameWriter(int index, IFrameWriter 
writer, RecordDescriptor recordDesc) {
+                    writers[index] = writer;
+                }
+            };
+        }
+    }
+
+    protected class MaterializeReaderActivityNode extends AbstractActivityNode 
{
+        private static final long serialVersionUID = 1L;
+
+        public MaterializeReaderActivityNode(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final 
IHyracksTaskContext ctx,
+                final IRecordDescriptorProvider recordDescProvider, final int 
partition, int nPartitions)
+                throws HyracksDataException {
+            return new AbstractUnaryOutputSourceOperatorNodePushable() {
+
+                @Override
+                public void initialize() throws HyracksDataException {
+                    MaterializerTaskState state = (MaterializerTaskState) 
ctx.getStateObject(
+                            new TaskId(new ActivityId(getOperatorId(), 
SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
+                    state.writeOut(writer, new VSizeFrame(ctx));
+                }
+
+            };
+        }
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java
new file mode 100644
index 0000000..0782647
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.misc;
+
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import 
org.apache.hyracks.dataflow.std.base.AbstractReplicateOperatorDescriptor;
+
+public class ReplicateOperatorDescriptor extends 
AbstractReplicateOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    public ReplicateOperatorDescriptor(IOperatorDescriptorRegistry spec, 
RecordDescriptor rDesc, int outputArity) {
+        this(spec, rDesc, outputArity, new boolean[outputArity]);
+    }
+
+    public ReplicateOperatorDescriptor(IOperatorDescriptorRegistry spec, 
RecordDescriptor rDesc, int outputArity,
+            boolean[] outputMaterializationFlags) {
+        super(spec, rDesc, outputArity, outputMaterializationFlags);
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
index 67af861..99950c7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -20,86 +20,59 @@
 
 import java.nio.ByteBuffer;
 
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspector;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.TaskId;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.data.std.util.GrowableArray;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import 
org.apache.hyracks.dataflow.std.base.AbstractReplicateOperatorDescriptor;
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
-import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
-public class SplitOperatorDescriptor extends AbstractOperatorDescriptor {
+/**
+ * Split operator propagates each tuple in a frame to one output branch only 
unlike Replicate operator.
+ */
+public class SplitOperatorDescriptor extends 
AbstractReplicateOperatorDescriptor {
     private static final long serialVersionUID = 1L;
 
-    private final static int SPLITTER_MATERIALIZER_ACTIVITY_ID = 0;
-    private final static int MATERIALIZE_READER_ACTIVITY_ID = 1;
-
-    private final boolean[] outputMaterializationFlags;
-    private final boolean requiresMaterialization;
-    private final int numberOfNonMaterializedOutputs;
-    private final int numberOfMaterializedOutputs;
-
-    public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, 
RecordDescriptor rDesc, int outputArity) {
-        this(spec, rDesc, outputArity, new boolean[outputArity]);
-    }
+    // The position of the field that has output branch information.
+    private int conditionalVarFieldPos = 0;
+    // The inspector used to check the value of the above field
+    private IBinaryIntegerInspectorFactory intInsepctorFactory;
 
     public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, 
RecordDescriptor rDesc, int outputArity,
-            boolean[] outputMaterializationFlags) {
-        super(spec, 1, outputArity);
-        for (int i = 0; i < outputArity; i++) {
-            recordDescriptors[i] = rDesc;
-        }
-        this.outputMaterializationFlags = outputMaterializationFlags;
-
-        boolean reqMaterialization = false;
-        int matOutputs = 0;
-        int nonMatOutputs = 0;
-        for (boolean flag : outputMaterializationFlags) {
-            if (flag) {
-                reqMaterialization = true;
-                matOutputs++;
-            } else {
-                nonMatOutputs++;
-            }
-        }
-
-        this.requiresMaterialization = reqMaterialization;
-        this.numberOfMaterializedOutputs = matOutputs;
-        this.numberOfNonMaterializedOutputs = nonMatOutputs;
-
+            IBinaryIntegerInspectorFactory intInsepctorFactory, int 
conditionalVarFieldPos) {
+        super(spec, rDesc, outputArity);
+        this.conditionalVarFieldPos = conditionalVarFieldPos;
+        this.intInsepctorFactory = intInsepctorFactory;
     }
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        SplitterMaterializerActivityNode sma =
-                new SplitterMaterializerActivityNode(new ActivityId(odId, 
SPLITTER_MATERIALIZER_ACTIVITY_ID));
+        SplitterMaterializerActivityNode sma = new 
SplitterMaterializerActivityNode(
+                new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
         builder.addActivity(this, sma);
         builder.addSourceEdge(0, sma, 0);
         int pipelineOutputIndex = 0;
-        int activityId = MATERIALIZE_READER_ACTIVITY_ID;
         for (int i = 0; i < outputArity; i++) {
-            if (outputMaterializationFlags[i]) {
-                MaterializeReaderActivityNode mra =
-                        new MaterializeReaderActivityNode(new ActivityId(odId, 
activityId++));
-                builder.addActivity(this, mra);
-                builder.addBlockingEdge(sma, mra);
-                builder.addTargetEdge(i, mra, 0);
-            } else {
-                builder.addTargetEdge(i, sma, pipelineOutputIndex++);
-            }
+            builder.addTargetEdge(i, sma, pipelineOutputIndex++);
         }
     }
 
-    private final class SplitterMaterializerActivityNode extends 
AbstractActivityNode {
+    // The difference between SplitterMaterializerActivityNode and 
ReplicatorMaterializerActivityNode is that
+    // SplitterMaterializerActivityNode propagates each tuple to one output 
branch only.
+    private final class SplitterMaterializerActivityNode extends 
ReplicatorMaterializerActivityNode {
         private static final long serialVersionUID = 1L;
 
         public SplitterMaterializerActivityNode(ActivityId id) {
@@ -113,15 +86,25 @@
                 private MaterializerTaskState state;
                 private final IFrameWriter[] writers = new 
IFrameWriter[numberOfNonMaterializedOutputs];
                 private final boolean[] isOpen = new 
boolean[numberOfNonMaterializedOutputs];
+                private FrameTupleAccessor accessor;
+                private ArrayTupleBuilder[] builders;
+                private GrowableArray[] builderDatas;
+                private FrameTupleAppender[] appenders;
+                IBinaryIntegerInspector intInsepctor = 
intInsepctorFactory.createBinaryIntegerInspector(ctx);
 
                 @Override
                 public void open() throws HyracksDataException {
-                    if (requiresMaterialization) {
-                        state = new 
MaterializerTaskState(ctx.getJobletContext().getJobId(),
-                                new TaskId(getActivityId(), partition), 
numberOfMaterializedOutputs);
-                        state.open(ctx);
-                    }
+                    // To deal with each tuple in a frame
+                    accessor = new FrameTupleAccessor(recordDescriptors[0]);
+                    builders = new 
ArrayTupleBuilder[numberOfNonMaterializedOutputs];
+                    builderDatas = new 
GrowableArray[numberOfNonMaterializedOutputs];
+                    appenders = new 
FrameTupleAppender[numberOfNonMaterializedOutputs];
+
                     for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                        builders[i] = new 
ArrayTupleBuilder(recordDescriptors[0].getFieldCount());
+                        builderDatas[i] = builders[i].getFieldData();
+                        appenders[i] = new FrameTupleAppender(new 
VSizeFrame(ctx), true);
+
                         isOpen[i] = true;
                         writers[i].open();
                     }
@@ -129,43 +112,38 @@
 
                 @Override
                 public void nextFrame(ByteBuffer bufferAccessor) throws 
HyracksDataException {
-                    if (requiresMaterialization) {
-                        state.appendFrame(bufferAccessor);
-                        bufferAccessor.clear();
-                    }
-                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
-                        FrameUtils.flushFrame(bufferAccessor, writers[i]);
-                    }
-                }
+                    // Tuple based access
+                    accessor.reset(bufferAccessor);
+                    int tupleCount = accessor.getTupleCount();
+                    // The output branch number that starts from 0.
+                    int resultValue = 0;
 
-                @Override
-                public void flush() throws HyracksDataException {
-                    if (!requiresMaterialization) {
-                        for (IFrameWriter writer : writers) {
-                            writer.flush();
-                        }
+                    for (int i = 0; i < tupleCount; i++) {
+                        // Get the output branch number from the field in the 
given tuple.
+                        resultValue = 
intInsepctor.getIntegerValue(accessor.getBuffer().array(),
+                                accessor.getTupleStartOffset(i) + 
accessor.getFieldSlotsLength()
+                                        + accessor.getFieldStartOffset(i, 
conditionalVarFieldPos),
+                                accessor.getFieldLength(i, 
conditionalVarFieldPos));
+
+                        // Add this tuple to the correct output frame.
+                        // ProfileSW is added to measure the execution time 
when the profiler setting is enabled.
+                        FrameUtils.appendToWriter(writers[resultValue], 
appenders[resultValue], accessor, i);
                     }
                 }
 
                 @Override
                 public void close() throws HyracksDataException {
                     HyracksDataException hde = null;
-                    try {
-                        if (requiresMaterialization) {
-                            state.close();
-                            ctx.setStateObject(state);
-                        }
-                    } finally {
-                        for (int i = 0; i < numberOfNonMaterializedOutputs; 
i++) {
-                            if (isOpen[i]) {
-                                try {
-                                    writers[i].close();
-                                } catch (Throwable th) {
-                                    if (hde == null) {
-                                        hde = new HyracksDataException(th);
-                                    } else {
-                                        hde.addSuppressed(th);
-                                    }
+                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                        if (isOpen[i]) {
+                            try {
+                                appenders[i].write(writers[i], true);
+                                writers[i].close();
+                            } catch (Throwable th) {
+                                if (hde == null) {
+                                    hde = new HyracksDataException(th);
+                                } else {
+                                    hde.addSuppressed(th);
                                 }
                             }
                         }
@@ -203,29 +181,4 @@
             };
         }
     }
-
-    private final class MaterializeReaderActivityNode extends 
AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        public MaterializeReaderActivityNode(ActivityId id) {
-            super(id);
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final 
IHyracksTaskContext ctx,
-                final IRecordDescriptorProvider recordDescProvider, final int 
partition, int nPartitions)
-                throws HyracksDataException {
-            return new AbstractUnaryOutputSourceOperatorNodePushable() {
-
-                @Override
-                public void initialize() throws HyracksDataException {
-                    MaterializerTaskState state = (MaterializerTaskState) 
ctx.getStateObject(
-                            new TaskId(new ActivityId(getOperatorId(), 
SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
-                    state.writeOut(writer, new VSizeFrame(ctx));
-                }
-
-            };
-        }
-    }
-
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java
index 40b4251..6191dcb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java
@@ -40,7 +40,7 @@
 import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
 import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
 import org.apache.hyracks.tests.util.ResultSerializerFactoryProvider;
 
@@ -86,7 +86,7 @@
                 inputSplits), stringParser, stringRec);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp, 
locations);
 
-        SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, 
stringRec, outputArity);
+        ReplicateOperatorDescriptor splitOp = new 
ReplicateOperatorDescriptor(spec, stringRec, outputArity);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, splitOp, 
locations);
 
         IOperatorDescriptor outputOp[] = new 
IOperatorDescriptor[outputFile.length];

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1196
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ice190827513cd8632764b52c9d0338d65c830740
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Taewoo Kim <wangs...@yahoo.com>

Reply via email to