Taewoo Kim has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1196
Change subject: Index-only plan step 2: Added SplitOperator
......................................................................
Index-only plan step 2: Added SplitOperator
- Introduced SplitOperator that sends a tuple to only one output frame unlike
the ReplicateOperator
that propagates a tuple into all outputs frames.
Change-Id: Ice190827513cd8632764b52c9d0338d65c830740
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
A
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
A
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
A
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractReplicatePOperator.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
A
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
M
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
M
hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
A
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
A
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java
33 files changed, 802 insertions(+), 246 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/96/1196/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
index e04be6f..c5d379e 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
@@ -44,12 +44,12 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -57,6 +57,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -210,6 +211,11 @@
}
@Override
+ public Void visitSplitOperator(SplitOperator op, Void arg) throws
AlgebricksException {
+ return null;
+ }
+
+ @Override
public Void visitMaterializeOperator(MaterializeOperator op, Void arg)
throws AlgebricksException {
return null;
}
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
index 143cf0b..f4210e9 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
@@ -26,8 +26,8 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.Map.Entry;
+import java.util.Set;
import org.apache.asterix.lang.common.util.FunctionUtil;
import org.apache.asterix.om.base.AString;
@@ -67,18 +67,19 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
@@ -519,6 +520,11 @@
}
@Override
+ public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg)
throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
public ILogicalOperator visitMaterializeOperator(MaterializeOperator op,
Void arg) throws AlgebricksException {
return visitSingleInputOperator(op);
}
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
index c7b927e..4b2f5a8 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
@@ -59,6 +59,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -287,6 +288,11 @@
}
@Override
+ public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg)
throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
public ILogicalOperator visitMaterializeOperator(MaterializeOperator op,
Void arg) throws AlgebricksException {
return visitSingleInputOperator(op);
}
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
index ef0f9da..8147148 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
@@ -33,17 +33,18 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -163,6 +164,11 @@
}
@Override
+ public Boolean visitSplitOperator(SplitOperator op, Void arg) throws
AlgebricksException {
+ return visitInputs(op);
+ }
+
+ @Override
public Boolean visitMaterializeOperator(MaterializeOperator op, Void arg)
throws AlgebricksException {
return visitInputs(op);
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index d39a8c8..c12ab60 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -45,6 +45,7 @@
SCRIPT,
SELECT,
SINK,
+ SPLIT,
SUBPLAN,
TOKENIZE,
UNIONALL,
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 7d5cf16..de6852e 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -53,6 +53,7 @@
RANDOM_MERGE_EXCHANGE,
RANGE_PARTITION_EXCHANGE,
RANGE_PARTITION_MERGE_EXCHANGE,
+ REPLICATE,
RTREE_SEARCH,
RUNNING_AGGREGATE,
SINGLE_PARTITION_INVERTED_INDEX_SEARCH,
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
new file mode 100644
index 0000000..adf6945
--- /dev/null
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import
org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+
+/**
+ * Abstract class for two replication related operator - replicate and split
+ * Replicate operator propagates all frames to all output branches.
+ * That is, each tuple will be propagated to all output branches.
+ * Split operator propagates each tuple in a frame to one output branch only.
+ */
+public abstract class AbstractReplicateOperator extends
AbstractLogicalOperator {
+
+ private int outputArity;
+ private boolean[] outputMaterializationFlags;
+ private List<Mutable<ILogicalOperator>> outputs;
+
+ public AbstractReplicateOperator(int outputArity) {
+ this.outputArity = outputArity;
+ this.outputMaterializationFlags = new boolean[outputArity];
+ this.outputs = new ArrayList<>();
+ }
+
+ public AbstractReplicateOperator(int outputArity, boolean[]
outputMaterializationFlags) {
+ this.outputArity = outputArity;
+ this.outputMaterializationFlags = outputMaterializationFlags;
+ this.outputs = new ArrayList<>();
+ }
+
+ @Override
+ public boolean
acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+ throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public VariablePropagationPolicy getVariablePropagationPolicy() {
+ return VariablePropagationPolicy.ALL;
+ }
+
+ @Override
+ public boolean isMap() {
+ return true;
+ }
+
+ @Override
+ public void recomputeSchema() {
+ schema = new
ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema());
+ }
+
+ public void substituteVar(LogicalVariable v1, LogicalVariable v2) {
+ // do nothing
+ }
+
+ public int getOutputArity() {
+ return outputArity;
+ }
+
+ public void setOutputMaterializationFlags(boolean[]
outputMaterializationFlags) {
+ this.outputMaterializationFlags = outputMaterializationFlags;
+ }
+
+ public boolean[] getOutputMaterializationFlags() {
+ return outputMaterializationFlags;
+ }
+
+ public List<Mutable<ILogicalOperator>> getOutputs() {
+ return outputs;
+ }
+
+ @Override
+ public IVariableTypeEnvironment
computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+ return createPropagatingAllInputsTypeEnvironment(ctx);
+ }
+
+ public boolean isBlocker() {
+ for (boolean requiresMaterialization : outputMaterializationFlags) {
+ if (requiresMaterialization) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
index 834107c..3c1b8b2 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
@@ -18,36 +18,18 @@
*/
package org.apache.hyracks.algebricks.core.algebra.operators.logical;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import
org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
-import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
-import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
-public class ReplicateOperator extends AbstractLogicalOperator {
-
- private int outputArity;
- private boolean[] outputMaterializationFlags;
- private List<Mutable<ILogicalOperator>> outputs;
+public class ReplicateOperator extends AbstractReplicateOperator {
public ReplicateOperator(int outputArity) {
- this.outputArity = outputArity;
- this.outputMaterializationFlags = new boolean[outputArity];
- this.outputs = new ArrayList<>();
+ super(outputArity);
}
public ReplicateOperator(int outputArity, boolean[]
outputMaterializationFlags) {
- this.outputArity = outputArity;
- this.outputMaterializationFlags = outputMaterializationFlags;
- this.outputs = new ArrayList<>();
+ super(outputArity, outputMaterializationFlags);
}
@Override
@@ -58,60 +40,5 @@
@Override
public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg)
throws AlgebricksException {
return visitor.visitReplicateOperator(this, arg);
- }
-
- @Override
- public boolean
acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
- throws AlgebricksException {
- return false;
- }
-
- @Override
- public VariablePropagationPolicy getVariablePropagationPolicy() {
- return VariablePropagationPolicy.ALL;
- }
-
- @Override
- public boolean isMap() {
- return true;
- }
-
- @Override
- public void recomputeSchema() {
- schema = new
ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema());
- }
-
- public void substituteVar(LogicalVariable v1, LogicalVariable v2) {
- // do nothing
- }
-
- public int getOutputArity() {
- return outputArity;
- }
-
- public void setOutputMaterializationFlags(boolean[]
outputMaterializationFlags) {
- this.outputMaterializationFlags = outputMaterializationFlags;
- }
-
- public boolean[] getOutputMaterializationFlags() {
- return outputMaterializationFlags;
- }
-
- public List<Mutable<ILogicalOperator>> getOutputs() {
- return outputs;
- }
-
- @Override
- public IVariableTypeEnvironment
computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
- return createPropagatingAllInputsTypeEnvironment(ctx);
- }
-
- public boolean isBlocker() {
- for (boolean requiresMaterialization : outputMaterializationFlags) {
- if (requiresMaterialization) {
- return true;
- }
- }
- return false;
}
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
new file mode 100644
index 0000000..90a358f
--- /dev/null
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+/**
+ * Split Operator receives a variable as conditional variable.
+ * Based on its value, it propagates each tuple to the corresponding output
frame. (e.g., first output = 0, ...)
+ * Thus, unlike Replicate operator that does unconditional propagation to all
outputs,
+ * this does a conditional propagate operation.
+ *
+ * @author waans11
+ */
+public class SplitOperator extends AbstractReplicateOperator {
+
+ // variable that keeps the output branch information for each tuple
+ private LogicalVariable conditionVar;
+
+ public SplitOperator(int outputArity, LogicalVariable conditionVar) {
+ super(outputArity);
+ this.conditionVar = conditionVar;
+ }
+
+ @Override
+ public LogicalOperatorTag getOperatorTag() {
+ return LogicalOperatorTag.SPLIT;
+ }
+
+ @Override
+ public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg)
throws AlgebricksException {
+ return visitor.visitSplitOperator(this, arg);
+ }
+
+ public LogicalVariable getConditionVar() {
+ return conditionVar;
+ }
+
+ // Get the field position of the conditional variable.
+ public int getConditionVarFieldPos() throws AlgebricksException {
+ if (schema != null) {
+ return schema.indexOf(conditionVar);
+ } else {
+ throw new AlgebricksException("SPLIT operator: can't get the
position of conditional variable.");
+ }
+ }
+}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
index b999493..7b19f08 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
@@ -55,6 +55,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -185,6 +186,11 @@
}
@Override
+ public Long visitSplitOperator(SplitOperator op, Void arg) throws
AlgebricksException {
+ return op.getInputs().get(0).getValue().accept(this, arg);
+ }
+
+ @Override
public Long visitMaterializeOperator(MaterializeOperator op, Void arg)
throws AlgebricksException {
return op.getInputs().get(0).getValue().accept(this, arg);
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 5e2a041..4370fc4 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -74,6 +74,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -423,6 +424,12 @@
}
@Override
+ public Void visitSplitOperator(SplitOperator op, IOptimizationContext ctx)
throws AlgebricksException {
+ propagateFDsAndEquivClasses(op, ctx);
+ return null;
+ }
+
+ @Override
public Void visitMaterializeOperator(MaterializeOperator op,
IOptimizationContext ctx) throws AlgebricksException {
propagateFDsAndEquivClasses(op, ctx);
return null;
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index c0e8b34..95110eb 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -62,6 +62,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -298,6 +299,14 @@
}
@Override
+ public Boolean visitSplitOperator(SplitOperator op, ILogicalOperator arg)
throws AlgebricksException {
+ AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+ if (aop.getOperatorTag() != LogicalOperatorTag.SPLIT)
+ return Boolean.FALSE;
+ return Boolean.TRUE;
+ }
+
+ @Override
public Boolean visitMaterializeOperator(MaterializeOperator op,
ILogicalOperator arg) throws AlgebricksException {
AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
if (aop.getOperatorTag() != LogicalOperatorTag.MATERIALIZE) {
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 965008c..7fa98a3 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -22,8 +22,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.Map.Entry;
+import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -64,6 +64,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -178,6 +179,12 @@
}
@Override
+ public Void visitSplitOperator(SplitOperator op, ILogicalOperator arg)
throws AlgebricksException {
+ mapVariablesStandard(op, arg);
+ return null;
+ }
+
+ @Override
public Void visitMaterializeOperator(MaterializeOperator op,
ILogicalOperator arg) throws AlgebricksException {
mapVariablesStandard(op, arg);
return null;
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 4241d84..fd23ed4 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -52,18 +52,19 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
import
org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
@@ -410,6 +411,13 @@
}
@Override
+ public ILogicalOperator visitSplitOperator(SplitOperator op,
ILogicalOperator arg) throws AlgebricksException {
+ SplitOperator opCopy = new SplitOperator(op.getOutputArity(),
op.getConditionVar());
+ deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+ return opCopy;
+ }
+
+ @Override
public ILogicalOperator visitMaterializeOperator(MaterializeOperator op,
ILogicalOperator arg)
throws AlgebricksException {
MaterializeOperator opCopy = new MaterializeOperator();
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 8d3644d..995a4cb 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -54,6 +54,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -193,6 +194,11 @@
}
@Override
+ public Void visitSplitOperator(SplitOperator op, IOptimizationContext arg)
throws AlgebricksException {
+ return null;
+ }
+
+ @Override
public Void visitMaterializeOperator(MaterializeOperator op,
IOptimizationContext arg) throws AlgebricksException {
// TODO Auto-generated method stub
return null;
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index fde6a28..07a3e63 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -52,6 +52,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -59,6 +60,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -66,7 +68,6 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import
org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
@@ -180,6 +181,11 @@
}
@Override
+ public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg)
throws AlgebricksException {
+ return new SplitOperator(op.getOutputArity(), op.getConditionVar());
+ }
+
+ @Override
public ILogicalOperator visitScriptOperator(ScriptOperator op, Void arg)
throws AlgebricksException {
ArrayList<LogicalVariable> newInputList = new ArrayList<>();
ArrayList<LogicalVariable> newOutputList = new ArrayList<>();
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
index f01b20f..4575fa2 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
@@ -56,6 +56,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -185,6 +186,11 @@
}
@Override
+ public Void visitSplitOperator(SplitOperator op, IOptimizationContext arg)
throws AlgebricksException {
+ return null;
+ }
+
+ @Override
public Void visitMaterializeOperator(MaterializeOperator op,
IOptimizationContext ctx) throws AlgebricksException {
return null;
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 10659b1..fe38f13 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -48,11 +48,11 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -60,6 +60,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -255,6 +256,11 @@
}
@Override
+ public Void visitSplitOperator(SplitOperator op, Void arg) throws
AlgebricksException {
+ return null;
+ }
+
+ @Override
public Void visitMaterializeOperator(MaterializeOperator op, Void arg)
throws AlgebricksException {
return null;
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index d35153a..9cc56ef 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -46,11 +46,11 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -58,6 +58,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -288,6 +289,12 @@
}
@Override
+ public Void visitSplitOperator(SplitOperator op, Void arg) throws
AlgebricksException {
+ standardLayout(op);
+ return null;
+ }
+
+ @Override
public Void visitMaterializeOperator(MaterializeOperator op, Void arg)
throws AlgebricksException {
standardLayout(op);
return null;
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index f2da7c4..3b75746 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -46,12 +46,12 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -59,6 +59,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -414,6 +415,13 @@
}
@Override
+ public Void visitSplitOperator(SplitOperator op, Pair<LogicalVariable,
LogicalVariable> arg)
+ throws AlgebricksException {
+ op.substituteVar(arg.first, arg.second);
+ return null;
+ }
+
+ @Override
public Void visitMaterializeOperator(MaterializeOperator op,
Pair<LogicalVariable, LogicalVariable> arg)
throws AlgebricksException {
return null;
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 0d50dc2..ab6b01f 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -59,6 +59,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -443,6 +444,15 @@
}
@Override
+ public Void visitSplitOperator(SplitOperator op, Void arg) throws
AlgebricksException {
+ for (Mutable<ILogicalOperator> outputOp : op.getOutputs()) {
+ VariableUtilities.getUsedVariables(outputOp.getValue(),
usedVariables);
+ }
+ usedVariables.add(op.getConditionVar());
+ return null;
+ }
+
+ @Override
public Void visitMaterializeOperator(MaterializeOperator op, Void arg)
throws AlgebricksException {
return null;
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractReplicatePOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractReplicatePOperator.java
new file mode 100644
index 0000000..1d13163
--- /dev/null
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractReplicatePOperator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractReplicateOperator;
+import
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+
+public abstract class AbstractReplicatePOperator extends
AbstractPhysicalOperator {
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
+ return emptyUnaryRequirements();
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op,
IOptimizationContext context) {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator)
op.getInputs().get(0).getValue();
+ deliveredProperties = (StructuralPropertiesVector)
op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator
op) {
+ int[] inputDependencyLabels = new int[] { 0 };
+ AbstractReplicateOperator rop = (AbstractReplicateOperator) op;
+ int[] outputDependencyLabels = new int[rop.getOutputArity()];
+ // change the labels of outputs that requires materialization to 1
+ boolean[] outputMaterializationFlags =
rop.getOutputMaterializationFlags();
+ for (int i = 0; i < rop.getOutputArity(); i++) {
+ if (outputMaterializationFlags[i]) {
+ outputDependencyLabels[i] = 1;
+ }
+ }
+ return new Pair<>(inputDependencyLabels, outputDependencyLabels);
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
index db778f7..74739da 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
@@ -19,45 +19,23 @@
package org.apache.hyracks.algebricks.core.algebra.operators.physical;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
-import
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor;
-public class ReplicatePOperator extends AbstractPhysicalOperator {
+public class ReplicatePOperator extends AbstractReplicatePOperator {
@Override
public PhysicalOperatorTag getOperatorTag() {
- return PhysicalOperatorTag.SPLIT;
- }
-
- @Override
- public boolean isMicroOperator() {
- return false;
- }
-
- @Override
- public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
- return emptyUnaryRequirements();
- }
-
- @Override
- public void computeDeliveredProperties(ILogicalOperator op,
IOptimizationContext context) {
- AbstractLogicalOperator op2 = (AbstractLogicalOperator)
op.getInputs().get(0).getValue();
- deliveredProperties = (StructuralPropertiesVector)
op2.getDeliveredPhysicalProperties().clone();
+ return PhysicalOperatorTag.REPLICATE;
}
@Override
@@ -65,31 +43,18 @@
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
IOperatorSchema outerPlanSchema)
throws AlgebricksException {
IOperatorDescriptorRegistry spec = builder.getJobSpec();
- RecordDescriptor recDescriptor =
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
propagatedSchema, context);
+ RecordDescriptor recDescriptor =
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+ propagatedSchema, context);
ReplicateOperator rop = (ReplicateOperator) op;
int outputArity = rop.getOutputArity();
boolean[] outputMaterializationFlags =
rop.getOutputMaterializationFlags();
- SplitOperatorDescriptor splitOpDesc = new
SplitOperatorDescriptor(spec, recDescriptor, outputArity,
outputMaterializationFlags);
+ ReplicateOperatorDescriptor splitOpDesc = new
ReplicateOperatorDescriptor(spec, recDescriptor, outputArity,
+ outputMaterializationFlags);
contributeOpDesc(builder, (AbstractLogicalOperator) op, splitOpDesc);
ILogicalOperator src = op.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, op, 0);
- }
-
- @Override
- public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator
op) {
- int[] inputDependencyLabels = new int[] { 0 };
- ReplicateOperator rop = (ReplicateOperator) op;
- int[] outputDependencyLabels = new int[rop.getOutputArity()];
- // change the labels of outputs that requires materialization to 1
- boolean[] outputMaterializationFlags =
rop.getOutputMaterializationFlags();
- for (int i = 0; i < rop.getOutputArity(); i++) {
- if (outputMaterializationFlags[i]) {
- outputDependencyLabels[i] = 1;
- }
- }
- return new Pair<>(inputDependencyLabels, outputDependencyLabels);
}
@Override
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
new file mode 100644
index 0000000..7ad8c52
--- /dev/null
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+
+public class SplitPOperator extends AbstractReplicatePOperator {
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.SPLIT;
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder,
JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ IOperatorDescriptorRegistry spec = builder.getJobSpec();
+ RecordDescriptor recDescriptor =
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+ propagatedSchema, context);
+
+ // To check the result value in the conditional variable field
+ IBinaryIntegerInspectorFactory intInsepctorFactory =
context.getBinaryIntegerInspectorFactory();
+
+ SplitOperator sop = (SplitOperator) op;
+ int outputArity = sop.getOutputArity();
+
+ int pos = inputSchemas[0].findVariable(sop.getConditionVar());
+ if (pos < 0) {
+ throw new AlgebricksException("SPLIT operator: could not find the
variable " + sop.getConditionVar() + ".");
+ }
+
+ int conditionalVarFieldPos = pos;
+
+ SplitOperatorDescriptor splitOpDesc = null;
+ splitOpDesc = new SplitOperatorDescriptor(spec, recDescriptor,
outputArity, intInsepctorFactory,
+ conditionalVarFieldPos);
+
+ contributeOpDesc(builder, (AbstractLogicalOperator) op, splitOpDesc);
+ ILogicalOperator src = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, op, 0);
+ }
+}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 7e83880..472dec7 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -41,6 +41,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
@@ -56,6 +57,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -63,7 +65,6 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
@@ -363,6 +364,13 @@
}
@Override
+ public Void visitSplitOperator(SplitOperator op, Integer indent) throws
AlgebricksException {
+ LogicalVariable conditionalSplitVar = op.getConditionVar();
+ addIndent(indent).append("split [" + conditionalSplitVar + "]");
+ return null;
+ }
+
+ @Override
public Void visitMaterializeOperator(MaterializeOperator op, Integer
indent) throws AlgebricksException {
addIndent(indent).append("materialize");
return null;
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index c0f9718..8fafabf 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -34,11 +34,11 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
@@ -46,6 +46,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -86,6 +87,8 @@
public R visitReplicateOperator(ReplicateOperator op, T arg) throws
AlgebricksException;
+ public R visitSplitOperator(SplitOperator op, T arg) throws
AlgebricksException;
+
public R visitMaterializeOperator(MaterializeOperator op, T arg) throws
AlgebricksException;
public R visitScriptOperator(ScriptOperator op, T arg) throws
AlgebricksException;
diff --git
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
index 39cac06..1644d2b 100644
---
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
@@ -54,6 +54,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -185,6 +186,11 @@
}
@Override
+ public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg)
throws AlgebricksException {
+ return visit(op);
+ }
+
+ @Override
public ILogicalOperator visitMaterializeOperator(MaterializeOperator op,
Void arg) throws AlgebricksException {
return visit(op);
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index 020cffe..4e3a0e4 100644
---
a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -83,7 +83,7 @@
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
import
org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor;
import org.apache.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -596,7 +596,7 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp,
new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
- SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec,
stringRec, outputArity);
+ ReplicateOperatorDescriptor splitOp = new
ReplicateOperatorDescriptor(spec, stringRec, outputArity);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, splitOp,
new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
index 6cfd941..6c6e5e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
@@ -71,5 +71,10 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>algebricks-data</artifactId>
+ <version>0.2.18-SNAPSHOT</version>
+ </dependency>
</dependencies>
</project>
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
new file mode 100644
index 0000000..181e2ec
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.base;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.TaskId;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.std.misc.MaterializerTaskState;
+
+/**
+ * Abstract class for two replication related operator descriptor - replicate
and split
+ * Replicate operator propagates all frames to all output branches.
+ * That is, each tuple will be propagated to all output branches.
+ * Split operator propagates each tuple in a frame to one output branch only.
+ */
+public abstract class AbstractReplicateOperatorDescriptor extends
AbstractOperatorDescriptor {
+ protected static final long serialVersionUID = 1L;
+
+ protected final static int SPLITTER_MATERIALIZER_ACTIVITY_ID = 0;
+ protected final static int MATERIALIZE_READER_ACTIVITY_ID = 1;
+
+ protected final boolean[] outputMaterializationFlags;
+ protected final boolean requiresMaterialization;
+ protected final int numberOfNonMaterializedOutputs;
+ protected final int numberOfMaterializedOutputs;
+
+ public AbstractReplicateOperatorDescriptor(IOperatorDescriptorRegistry
spec, RecordDescriptor rDesc,
+ int outputArity) {
+ this(spec, rDesc, outputArity, new boolean[outputArity]);
+ }
+
+ public AbstractReplicateOperatorDescriptor(IOperatorDescriptorRegistry
spec, RecordDescriptor rDesc,
+ int outputArity, boolean[] outputMaterializationFlags) {
+ super(spec, 1, outputArity);
+ for (int i = 0; i < outputArity; i++) {
+ recordDescriptors[i] = rDesc;
+ }
+ this.outputMaterializationFlags = outputMaterializationFlags;
+
+ boolean reqMaterialization = false;
+ int matOutputs = 0;
+ int nonMatOutputs = 0;
+ for (boolean flag : outputMaterializationFlags) {
+ if (flag) {
+ reqMaterialization = true;
+ matOutputs++;
+ } else {
+ nonMatOutputs++;
+ }
+ }
+
+ this.requiresMaterialization = reqMaterialization;
+ this.numberOfMaterializedOutputs = matOutputs;
+ this.numberOfNonMaterializedOutputs = nonMatOutputs;
+
+ }
+
+ @Override
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ ReplicatorMaterializerActivityNode sma = new
ReplicatorMaterializerActivityNode(
+ new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
+ builder.addActivity(this, sma);
+ builder.addSourceEdge(0, sma, 0);
+ int pipelineOutputIndex = 0;
+ int activityId = MATERIALIZE_READER_ACTIVITY_ID;
+ for (int i = 0; i < outputArity; i++) {
+ if (outputMaterializationFlags[i]) {
+ MaterializeReaderActivityNode mra = new
MaterializeReaderActivityNode(
+ new ActivityId(odId, activityId++));
+ builder.addActivity(this, mra);
+ builder.addBlockingEdge(sma, mra);
+ builder.addTargetEdge(i, mra, 0);
+ } else {
+ builder.addTargetEdge(i, sma, pipelineOutputIndex++);
+ }
+ }
+ }
+
+ protected class ReplicatorMaterializerActivityNode extends
AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public ReplicatorMaterializerActivityNode(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final
IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int
partition, int nPartitions) {
+ return new AbstractUnaryInputOperatorNodePushable() {
+ private MaterializerTaskState state;
+ private final IFrameWriter[] writers = new
IFrameWriter[numberOfNonMaterializedOutputs];
+ private final boolean[] isOpen = new
boolean[numberOfNonMaterializedOutputs];
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (requiresMaterialization) {
+ state = new
MaterializerTaskState(ctx.getJobletContext().getJobId(),
+ new TaskId(getActivityId(), partition),
numberOfMaterializedOutputs);
+ state.open(ctx);
+ }
+ for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+ isOpen[i] = true;
+ writers[i].open();
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer bufferAccessor) throws
HyracksDataException {
+ if (requiresMaterialization) {
+ state.appendFrame(bufferAccessor);
+ bufferAccessor.clear();
+ }
+ for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+ FrameUtils.flushFrame(bufferAccessor, writers[i]);
+ }
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+ writers[i].flush();
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ HyracksDataException hde = null;
+ try {
+ if (requiresMaterialization) {
+ state.close();
+ ctx.setStateObject(state);
+ }
+ } finally {
+ for (int i = 0; i < numberOfNonMaterializedOutputs;
i++) {
+ if (isOpen[i]) {
+ try {
+ writers[i].close();
+ } catch (Throwable th) {
+ if (hde == null) {
+ hde = new HyracksDataException(th);
+ } else {
+ hde.addSuppressed(th);
+ }
+ }
+ }
+ }
+ }
+ if (hde != null) {
+ throw hde;
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ HyracksDataException hde = null;
+ for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+ if (isOpen[i]) {
+ try {
+ writers[i].fail();
+ } catch (Throwable th) {
+ if (hde == null) {
+ hde = new HyracksDataException(th);
+ } else {
+ hde.addSuppressed(th);
+ }
+ }
+ }
+ }
+ if (hde != null) {
+ throw hde;
+ }
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter
writer, RecordDescriptor recordDesc) {
+ writers[index] = writer;
+ }
+ };
+ }
+ }
+
+ protected class MaterializeReaderActivityNode extends AbstractActivityNode
{
+ private static final long serialVersionUID = 1L;
+
+ public MaterializeReaderActivityNode(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final
IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, final int
partition, int nPartitions)
+ throws HyracksDataException {
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ MaterializerTaskState state = (MaterializerTaskState)
ctx.getStateObject(
+ new TaskId(new ActivityId(getOperatorId(),
SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
+ state.writeOut(writer, new VSizeFrame(ctx));
+ }
+
+ };
+ }
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java
new file mode 100644
index 0000000..0782647
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.misc;
+
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import
org.apache.hyracks.dataflow.std.base.AbstractReplicateOperatorDescriptor;
+
+public class ReplicateOperatorDescriptor extends
AbstractReplicateOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public ReplicateOperatorDescriptor(IOperatorDescriptorRegistry spec,
RecordDescriptor rDesc, int outputArity) {
+ this(spec, rDesc, outputArity, new boolean[outputArity]);
+ }
+
+ public ReplicateOperatorDescriptor(IOperatorDescriptorRegistry spec,
RecordDescriptor rDesc, int outputArity,
+ boolean[] outputMaterializationFlags) {
+ super(spec, rDesc, outputArity, outputMaterializationFlags);
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
index 67af861..99950c7 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -20,86 +20,59 @@
import java.nio.ByteBuffer;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspector;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.TaskId;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.data.std.util.GrowableArray;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import
org.apache.hyracks.dataflow.std.base.AbstractReplicateOperatorDescriptor;
import
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
-import
org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-public class SplitOperatorDescriptor extends AbstractOperatorDescriptor {
+/**
+ * Split operator propagates each tuple in a frame to one output branch only
unlike Replicate operator.
+ */
+public class SplitOperatorDescriptor extends
AbstractReplicateOperatorDescriptor {
private static final long serialVersionUID = 1L;
- private final static int SPLITTER_MATERIALIZER_ACTIVITY_ID = 0;
- private final static int MATERIALIZE_READER_ACTIVITY_ID = 1;
-
- private final boolean[] outputMaterializationFlags;
- private final boolean requiresMaterialization;
- private final int numberOfNonMaterializedOutputs;
- private final int numberOfMaterializedOutputs;
-
- public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec,
RecordDescriptor rDesc, int outputArity) {
- this(spec, rDesc, outputArity, new boolean[outputArity]);
- }
+ // The position of the field that has output branch information.
+ private int conditionalVarFieldPos = 0;
+ // The inspector used to check the value of the above field
+ private IBinaryIntegerInspectorFactory intInsepctorFactory;
public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec,
RecordDescriptor rDesc, int outputArity,
- boolean[] outputMaterializationFlags) {
- super(spec, 1, outputArity);
- for (int i = 0; i < outputArity; i++) {
- recordDescriptors[i] = rDesc;
- }
- this.outputMaterializationFlags = outputMaterializationFlags;
-
- boolean reqMaterialization = false;
- int matOutputs = 0;
- int nonMatOutputs = 0;
- for (boolean flag : outputMaterializationFlags) {
- if (flag) {
- reqMaterialization = true;
- matOutputs++;
- } else {
- nonMatOutputs++;
- }
- }
-
- this.requiresMaterialization = reqMaterialization;
- this.numberOfMaterializedOutputs = matOutputs;
- this.numberOfNonMaterializedOutputs = nonMatOutputs;
-
+ IBinaryIntegerInspectorFactory intInsepctorFactory, int
conditionalVarFieldPos) {
+ super(spec, rDesc, outputArity);
+ this.conditionalVarFieldPos = conditionalVarFieldPos;
+ this.intInsepctorFactory = intInsepctorFactory;
}
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- SplitterMaterializerActivityNode sma =
- new SplitterMaterializerActivityNode(new ActivityId(odId,
SPLITTER_MATERIALIZER_ACTIVITY_ID));
+ SplitterMaterializerActivityNode sma = new
SplitterMaterializerActivityNode(
+ new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
builder.addActivity(this, sma);
builder.addSourceEdge(0, sma, 0);
int pipelineOutputIndex = 0;
- int activityId = MATERIALIZE_READER_ACTIVITY_ID;
for (int i = 0; i < outputArity; i++) {
- if (outputMaterializationFlags[i]) {
- MaterializeReaderActivityNode mra =
- new MaterializeReaderActivityNode(new ActivityId(odId,
activityId++));
- builder.addActivity(this, mra);
- builder.addBlockingEdge(sma, mra);
- builder.addTargetEdge(i, mra, 0);
- } else {
- builder.addTargetEdge(i, sma, pipelineOutputIndex++);
- }
+ builder.addTargetEdge(i, sma, pipelineOutputIndex++);
}
}
- private final class SplitterMaterializerActivityNode extends
AbstractActivityNode {
+ // The difference between SplitterMaterializerActivityNode and
ReplicatorMaterializerActivityNode is that
+ // SplitterMaterializerActivityNode propagates each tuple to one output
branch only.
+ private final class SplitterMaterializerActivityNode extends
ReplicatorMaterializerActivityNode {
private static final long serialVersionUID = 1L;
public SplitterMaterializerActivityNode(ActivityId id) {
@@ -113,15 +86,25 @@
private MaterializerTaskState state;
private final IFrameWriter[] writers = new
IFrameWriter[numberOfNonMaterializedOutputs];
private final boolean[] isOpen = new
boolean[numberOfNonMaterializedOutputs];
+ private FrameTupleAccessor accessor;
+ private ArrayTupleBuilder[] builders;
+ private GrowableArray[] builderDatas;
+ private FrameTupleAppender[] appenders;
+ IBinaryIntegerInspector intInsepctor =
intInsepctorFactory.createBinaryIntegerInspector(ctx);
@Override
public void open() throws HyracksDataException {
- if (requiresMaterialization) {
- state = new
MaterializerTaskState(ctx.getJobletContext().getJobId(),
- new TaskId(getActivityId(), partition),
numberOfMaterializedOutputs);
- state.open(ctx);
- }
+ // To deal with each tuple in a frame
+ accessor = new FrameTupleAccessor(recordDescriptors[0]);
+ builders = new
ArrayTupleBuilder[numberOfNonMaterializedOutputs];
+ builderDatas = new
GrowableArray[numberOfNonMaterializedOutputs];
+ appenders = new
FrameTupleAppender[numberOfNonMaterializedOutputs];
+
for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+ builders[i] = new
ArrayTupleBuilder(recordDescriptors[0].getFieldCount());
+ builderDatas[i] = builders[i].getFieldData();
+ appenders[i] = new FrameTupleAppender(new
VSizeFrame(ctx), true);
+
isOpen[i] = true;
writers[i].open();
}
@@ -129,43 +112,38 @@
@Override
public void nextFrame(ByteBuffer bufferAccessor) throws
HyracksDataException {
- if (requiresMaterialization) {
- state.appendFrame(bufferAccessor);
- bufferAccessor.clear();
- }
- for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
- FrameUtils.flushFrame(bufferAccessor, writers[i]);
- }
- }
+ // Tuple based access
+ accessor.reset(bufferAccessor);
+ int tupleCount = accessor.getTupleCount();
+ // The output branch number that starts from 0.
+ int resultValue = 0;
- @Override
- public void flush() throws HyracksDataException {
- if (!requiresMaterialization) {
- for (IFrameWriter writer : writers) {
- writer.flush();
- }
+ for (int i = 0; i < tupleCount; i++) {
+ // Get the output branch number from the field in the
given tuple.
+ resultValue =
intInsepctor.getIntegerValue(accessor.getBuffer().array(),
+ accessor.getTupleStartOffset(i) +
accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(i,
conditionalVarFieldPos),
+ accessor.getFieldLength(i,
conditionalVarFieldPos));
+
+ // Add this tuple to the correct output frame.
+ // ProfileSW is added to measure the execution time
when the profiler setting is enabled.
+ FrameUtils.appendToWriter(writers[resultValue],
appenders[resultValue], accessor, i);
}
}
@Override
public void close() throws HyracksDataException {
HyracksDataException hde = null;
- try {
- if (requiresMaterialization) {
- state.close();
- ctx.setStateObject(state);
- }
- } finally {
- for (int i = 0; i < numberOfNonMaterializedOutputs;
i++) {
- if (isOpen[i]) {
- try {
- writers[i].close();
- } catch (Throwable th) {
- if (hde == null) {
- hde = new HyracksDataException(th);
- } else {
- hde.addSuppressed(th);
- }
+ for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+ if (isOpen[i]) {
+ try {
+ appenders[i].write(writers[i], true);
+ writers[i].close();
+ } catch (Throwable th) {
+ if (hde == null) {
+ hde = new HyracksDataException(th);
+ } else {
+ hde.addSuppressed(th);
}
}
}
@@ -203,29 +181,4 @@
};
}
}
-
- private final class MaterializeReaderActivityNode extends
AbstractActivityNode {
- private static final long serialVersionUID = 1L;
-
- public MaterializeReaderActivityNode(ActivityId id) {
- super(id);
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final
IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, final int
partition, int nPartitions)
- throws HyracksDataException {
- return new AbstractUnaryOutputSourceOperatorNodePushable() {
-
- @Override
- public void initialize() throws HyracksDataException {
- MaterializerTaskState state = (MaterializerTaskState)
ctx.getStateObject(
- new TaskId(new ActivityId(getOperatorId(),
SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
- state.writeOut(writer, new VSizeFrame(ctx));
- }
-
- };
- }
- }
-
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java
index 40b4251..6191dcb 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java
@@ -40,7 +40,7 @@
import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor;
import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
import org.apache.hyracks.tests.util.ResultSerializerFactoryProvider;
@@ -86,7 +86,7 @@
inputSplits), stringParser, stringRec);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp,
locations);
- SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec,
stringRec, outputArity);
+ ReplicateOperatorDescriptor splitOp = new
ReplicateOperatorDescriptor(spec, stringRec, outputArity);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, splitOp,
locations);
IOperatorDescriptor outputOp[] = new
IOperatorDescriptor[outputFile.length];
--
To view, visit https://asterix-gerrit.ics.uci.edu/1196
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ice190827513cd8632764b52c9d0338d65c830740
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Taewoo Kim <[email protected]>