Murtadha Hubail has submitted this change and it was merged. ( https://asterix-gerrit.ics.uci.edu/3338 )
Change subject: [NO ISSUE] Create an abstraction for the ForwardOperatorDescriptor ...................................................................... [NO ISSUE] Create an abstraction for the ForwardOperatorDescriptor - user model changes: no - storage format changes: no - interface changes: no Details: - Abstract the activities out of the ForwardOperatorDescriptor so we can reuse the same basic framework for multiple forward Operators. - Abstract the ForwardOperatorDescriptor out of the ForwardPOperator so we can reuse the same basic framework for multiple forward Operators. Change-Id: Icc3db4b386e69a98c2a1c40dadc96eb3e1a5d4fa Reviewed-on: https://asterix-gerrit.ics.uci.edu/3338 Contrib: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java R hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractForwardPOperator.java A hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortForwardPOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java A hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractForwardOperatorDescriptor.java R hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java 16 files changed, 174 insertions(+), 75 deletions(-) Approvals: Jenkins: Verified; ; Verified Ali Alsuliman: Looks good to me, approved Anon. E. Moose (1000171): Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java index 86b2b88..2d5e11e 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java @@ -310,7 +310,7 @@ @Override public Void visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException { - sweepExpression(op.getRangeMapExpression().getValue()); + sweepExpression(op.getSideDataExpression().getValue()); return null; } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java index bc24b49..70e36e5 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java @@ -33,30 +33,30 @@ import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor; /** - * Forward operator is used to forward data to different NCs based on a range map that is computed dynamically - * by doing a pass over the data itself to infer the range map. The operator takes two inputs: + * Forward operator is used to forward data to different NCs based on the side data activity that is computed + * dynamically by doing a pass over the data itself to infer the range map. The operator takes two inputs: * 1. Tuples/data (at index 0). The data is forwarded to the range-based connector which routes it to the target NC. - * 2. Range map (at index 1). The range map will be stored in Hyracks context, and the connector will pick it up. - * Forward operator will receive the range map when it is broadcast by the operator generating the range map after which - * the forward operator will start forwarding the data. + * 2. Side Activity (at index 1). The output will be stored in Hyracks context, and the connector will pick it up. + * Forward operator will receive the range map when it is broadcast by the operator generating the side activity output + * after which the forward operator will start forwarding the data. */ public class ForwardOperator extends AbstractLogicalOperator { - private final String rangeMapKey; - private final Mutable<ILogicalExpression> rangeMapExpression; + private final String sideDataKey; + private final Mutable<ILogicalExpression> sideDataExpression; - public ForwardOperator(String rangeMapKey, Mutable<ILogicalExpression> rangeMapExpression) { + public ForwardOperator(String sideDataKey, Mutable<ILogicalExpression> sideDataExpression) { super(); - this.rangeMapKey = rangeMapKey; - this.rangeMapExpression = rangeMapExpression; + this.sideDataKey = sideDataKey; + this.sideDataExpression = sideDataExpression; } - public String getRangeMapKey() { - return rangeMapKey; + public String getSideDataKey() { + return sideDataKey; } - public Mutable<ILogicalExpression> getRangeMapExpression() { - return rangeMapExpression; + public Mutable<ILogicalExpression> getSideDataExpression() { + return sideDataExpression; } @Override @@ -72,7 +72,7 @@ @Override public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException { - return visitor.transform(rangeMapExpression); + return visitor.transform(sideDataExpression); } @Override diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java index dd3053b..09358dd 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java @@ -605,9 +605,9 @@ return Boolean.FALSE; } ForwardOperator otherOp = (ForwardOperator) copyAndSubstituteVar(op, arg); - ILogicalExpression rangeMapExp = op.getRangeMapExpression().getValue(); - ILogicalExpression otherRangeMapExp = otherOp.getRangeMapExpression().getValue(); - return rangeMapExp.equals(otherRangeMapExp) && op.getRangeMapKey().equals(otherOp.getRangeMapKey()); + ILogicalExpression rangeMapExp = op.getSideDataExpression().getValue(); + ILogicalExpression otherRangeMapExp = otherOp.getSideDataExpression().getValue(); + return rangeMapExp.equals(otherRangeMapExp) && op.getSideDataKey().equals(otherOp.getSideDataKey()); } @Override diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java index 99e852d..34b0ae6 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java @@ -590,8 +590,8 @@ @Override public ILogicalOperator visitForwardOperator(ForwardOperator op, ILogicalOperator arg) throws AlgebricksException { - ForwardOperator opCopy = new ForwardOperator(op.getRangeMapKey(), - exprDeepCopyVisitor.deepCopyExpressionReference(op.getRangeMapExpression())); + ForwardOperator opCopy = new ForwardOperator(op.getSideDataKey(), + exprDeepCopyVisitor.deepCopyExpressionReference(op.getSideDataExpression())); deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy); return opCopy; } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java index 5be91cc..1727d10 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java @@ -345,7 +345,7 @@ @Override public ILogicalOperator visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException { - return new ForwardOperator(op.getRangeMapKey(), deepCopyExpressionRef(op.getRangeMapExpression())); + return new ForwardOperator(op.getSideDataKey(), deepCopyExpressionRef(op.getSideDataExpression())); } @Override diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java index 550a208..028bf9f 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java @@ -454,7 +454,7 @@ @Override public Void visitForwardOperator(ForwardOperator op, Pair<LogicalVariable, LogicalVariable> arg) throws AlgebricksException { - op.getRangeMapExpression().getValue().substituteVar(arg.first, arg.second); + op.getSideDataExpression().getValue().substituteVar(arg.first, arg.second); substVarTypes(op, arg); return null; } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java index 39b9689..845a853 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java @@ -428,7 +428,7 @@ @Override public Void visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException { - op.getRangeMapExpression().getValue().getUsedVariables(usedVariables); + op.getSideDataExpression().getValue().getUsedVariables(usedVariables); return null; } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractForwardPOperator.java similarity index 83% rename from hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java rename to hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractForwardPOperator.java index 11c584e..778af18 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractForwardPOperator.java @@ -39,17 +39,17 @@ import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.dataflow.std.misc.ForwardOperatorDescriptor; +import org.apache.hyracks.dataflow.std.base.AbstractForwardOperatorDescriptor; /** * <pre> - * {@see {@link ForwardOperator} and {@link ForwardOperatorDescriptor}} - * idx0: Input data source -- - * |-- forward op. - * idx1: RangeMap generator-- + * {@see {@link ForwardOperator} and {@link AbstractForwardOperatorDescriptor}} + * idx0: Input data source -- + * |-- forward op. + * idx1: Side activity output -- * </pre> */ -public class ForwardPOperator extends AbstractPhysicalOperator { +public abstract class AbstractForwardPOperator extends AbstractPhysicalOperator { @Override public PhysicalOperatorTag getOperatorTag() { @@ -57,8 +57,18 @@ } /** - * Forward operator requires that the global aggregate operator broadcasts the range map. No required properties at - * the data source input. + * Get the correct Forward Operator Descriptor + * @param builder Hyracks job builder + * @param forwardOp Forward Operator + * @param dataInputDescriptor Data input descriptor + * @return return the correct operator descriptor + */ + public abstract AbstractForwardOperatorDescriptor getOperatorDescriptor(IHyracksJobBuilder builder, + ForwardOperator forwardOp, RecordDescriptor dataInputDescriptor); + + /** + * Forward operator requires that the global aggregate operator broadcasts side activity output. + * No required properties at the data source input. * @param op {@see {@link ForwardOperator}} * @param requiredByParent parent's requirements, which are not enforced for now, as we only explore one plan * @param context the optimization context @@ -67,7 +77,7 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, IPhysicalPropertiesVector requiredByParent, IOptimizationContext context) { - // broadcast the range map to the cluster node domain + // broadcast the side activity output to the cluster node domain INodeDomain targetDomain = context.getComputationNodeDomain(); List<ILocalStructuralProperty> noProp = new ArrayList<>(); StructuralPropertiesVector[] requiredAtInputs = new StructuralPropertiesVector[2]; @@ -108,13 +118,13 @@ ForwardOperator forwardOp = (ForwardOperator) op; RecordDescriptor dataInputDescriptor = JobGenHelper.mkRecordDescriptor( context.getTypeEnvironment(forwardOp.getInputs().get(0).getValue()), inputSchemas[0], context); - ForwardOperatorDescriptor forwardDescriptor = - new ForwardOperatorDescriptor(builder.getJobSpec(), forwardOp.getRangeMapKey(), dataInputDescriptor); + AbstractForwardOperatorDescriptor forwardDescriptor = + getOperatorDescriptor(builder, forwardOp, dataInputDescriptor); builder.contributeHyracksOperator(forwardOp, forwardDescriptor); ILogicalOperator dataSource = forwardOp.getInputs().get(0).getValue(); builder.contributeGraphEdge(dataSource, 0, forwardOp, 0); - ILogicalOperator rangemapSource = forwardOp.getInputs().get(1).getValue(); - builder.contributeGraphEdge(rangemapSource, 0, forwardOp, 1); + ILogicalOperator sideDataSource = forwardOp.getInputs().get(1).getValue(); + builder.contributeGraphEdge(sideDataSource, 0, forwardOp, 1); } @Override diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortForwardPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortForwardPOperator.java new file mode 100644 index 0000000..0903b5e --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortForwardPOperator.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.algebricks.core.algebra.operators.physical; + +import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.dataflow.std.base.AbstractForwardOperatorDescriptor; +import org.apache.hyracks.dataflow.std.misc.SortForwardOperatorDescriptor; + +public class SortForwardPOperator extends AbstractForwardPOperator { + + @Override + public AbstractForwardOperatorDescriptor getOperatorDescriptor(IHyracksJobBuilder builder, + ForwardOperator forwardOp, RecordDescriptor dataInputDescriptor) { + return new SortForwardOperatorDescriptor(builder.getJobSpec(), forwardOp.getSideDataKey(), dataInputDescriptor); + } +} diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java index 27d4ced..4128c8b 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java @@ -467,7 +467,7 @@ @Override public Void visitForwardOperator(ForwardOperator op, Integer indent) throws AlgebricksException { addIndent(indent) - .append("forward: range-map = " + op.getRangeMapExpression().getValue().accept(exprVisitor, indent)); + .append("forward: range-map = " + op.getSideDataExpression().getValue().accept(exprVisitor, indent)); return null; } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java index f5ff12f..e56a8bd 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java @@ -639,7 +639,7 @@ addIndent(indent).append("\"operator\": \"forward\""); addIndent(0).append(",\n"); addIndent(indent).append("\"expressions\": \"" - + op.getRangeMapExpression().getValue().accept(exprVisitor, indent).replace('"', ' ') + "\""); + + op.getSideDataExpression().getValue().accept(exprVisitor, indent).replace('"', ' ') + "\""); return null; } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java index 1c761fa..795fb10 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java @@ -590,7 +590,7 @@ @Override public String visitForwardOperator(ForwardOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); - stringBuilder.append("forward(").append(op.getRangeMapExpression().getValue().toString()).append(")"); + stringBuilder.append("forward(").append(op.getSideDataExpression().getValue().toString()).append(")"); appendSchema(op, showDetails); appendAnnotations(op, showDetails); appendPhysicalOperatorInfo(op, showDetails); diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java index f66a6b8..7dc596c 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java @@ -65,7 +65,6 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.BroadcastExchangePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.physical.ForwardPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator; @@ -76,6 +75,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergeExchangePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.SequentialMergeExchangePOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortForwardPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator; import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor; @@ -880,7 +880,7 @@ AbstractLogicalExpression rangeMapExpression = new VariableReferenceExpression(rangeMapVariable, sourceLoc); ForwardOperator forwardOperator = new ForwardOperator(rangeMapKey, new MutableObject<>(rangeMapExpression)); forwardOperator.setSourceLocation(sourceLoc); - forwardOperator.setPhysicalOperator(new ForwardPOperator()); + forwardOperator.setPhysicalOperator(new SortForwardPOperator()); forwardOperator.getInputs().add(exchangeOpFromReplicate); forwardOperator.getInputs().add(globalAggInput); OperatorManipulationUtil.setOperatorMode(forwardOperator); diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java index 612f79e..a7bf11e 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java @@ -61,7 +61,6 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.DistributeResultPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.EmptyTupleSourcePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.physical.ForwardPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexBulkloadPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexInsertDeleteUpsertPOperator; @@ -78,6 +77,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.RunningAggregatePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.SinkPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.SinkWritePOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortForwardPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.SplitPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator; @@ -398,7 +398,7 @@ break; } case FORWARD: - op.setPhysicalOperator(new ForwardPOperator()); + op.setPhysicalOperator(new SortForwardPOperator()); break; } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractForwardOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractForwardOperatorDescriptor.java new file mode 100644 index 0000000..b57ad16 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractForwardOperatorDescriptor.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.std.base; + +import org.apache.hyracks.api.dataflow.IActivityGraphBuilder; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; + +// TODO(ali): forward operator should probably be moved to asterix layer +public abstract class AbstractForwardOperatorDescriptor extends AbstractOperatorDescriptor { + private static final long serialVersionUID = 1L; + protected static final int FORWARD_DATA_ACTIVITY_ID = 0; + protected static final int SIDE_DATA_ACTIVITY_ID = 1; + protected String sideDataKey; + + /** + * @param spec used to create the operator id. + * @param sideDataKey the key used to store the output of the side activity + * @param outputRecordDescriptor the output schema of this operator. + */ + public AbstractForwardOperatorDescriptor(IOperatorDescriptorRegistry spec, String sideDataKey, + RecordDescriptor outputRecordDescriptor) { + super(spec, 2, 1); + outRecDescs[0] = outputRecordDescriptor; + this.sideDataKey = sideDataKey; + } + + /** + * @return the forward data activity + */ + public abstract AbstractActivityNode createForwardDataActivity(); + + /** + * @return the side data activity + */ + public abstract AbstractActivityNode createSideDataActivity(); + + @Override + public void contributeActivities(IActivityGraphBuilder builder) { + AbstractActivityNode forwardDataActivity = createForwardDataActivity(); + AbstractActivityNode sideDataActivity = createSideDataActivity(); + + // side data activity, its input is coming through the operator's in-port = 1 & activity's in-port = 0 + builder.addActivity(this, sideDataActivity); + builder.addSourceEdge(1, sideDataActivity, 0); + + // forward data activity, its input is coming through the operator's in-port = 0 & activity's in-port = 0 + builder.addActivity(this, forwardDataActivity); + builder.addSourceEdge(0, forwardDataActivity, 0); + + // forward data activity will wait for the side data activity + builder.addBlockingEdge(sideDataActivity, forwardDataActivity); + + // data leaves from the operator's out-port = 0 & forward data activity's out-port = 0 + builder.addTargetEdge(0, forwardDataActivity, 0); + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java similarity index 83% rename from hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java rename to hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java index 49eea0a..1daf9fb 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java @@ -24,7 +24,6 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.ActivityId; -import org.apache.hyracks.api.dataflow.IActivityGraphBuilder; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.dataflow.TaskId; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; @@ -42,50 +41,33 @@ import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap; import org.apache.hyracks.dataflow.common.utils.TaskUtil; import org.apache.hyracks.dataflow.std.base.AbstractActivityNode; -import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; +import org.apache.hyracks.dataflow.std.base.AbstractForwardOperatorDescriptor; import org.apache.hyracks.dataflow.std.base.AbstractStateObject; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; // TODO(ali): forward operator should probably be moved to asterix layer -public class ForwardOperatorDescriptor extends AbstractOperatorDescriptor { +public class SortForwardOperatorDescriptor extends AbstractForwardOperatorDescriptor { private static final long serialVersionUID = 1L; - private static final int FORWARD_DATA_ACTIVITY_ID = 0; - private static final int RANGEMAP_READER_ACTIVITY_ID = 1; - private final String rangeMapKeyInContext; /** * @param spec used to create the operator id. - * @param rangeMapKeyInContext the unique key to store the range map in the shared map & transfer it to partitioner. + * @param sideDataKey the unique key to store the range map in the shared map & transfer it to partitioner. * @param outputRecordDescriptor the output schema of this operator. */ - public ForwardOperatorDescriptor(IOperatorDescriptorRegistry spec, String rangeMapKeyInContext, + public SortForwardOperatorDescriptor(IOperatorDescriptorRegistry spec, String sideDataKey, RecordDescriptor outputRecordDescriptor) { - super(spec, 2, 1); - this.rangeMapKeyInContext = rangeMapKeyInContext; - outRecDescs[0] = outputRecordDescriptor; + super(spec, sideDataKey, outputRecordDescriptor); } @Override - public void contributeActivities(IActivityGraphBuilder builder) { - ForwardDataActivity forwardDataActivity = - new ForwardDataActivity(new ActivityId(odId, FORWARD_DATA_ACTIVITY_ID)); - RangeMapReaderActivity rangeMapReaderActivity = - new RangeMapReaderActivity(new ActivityId(odId, RANGEMAP_READER_ACTIVITY_ID)); + public AbstractActivityNode createForwardDataActivity() { + return new ForwardDataActivity(new ActivityId(odId, FORWARD_DATA_ACTIVITY_ID)); + } - // range map reader activity, its input is coming through the operator's in-port = 1 & activity's in-port = 0 - builder.addActivity(this, rangeMapReaderActivity); - builder.addSourceEdge(1, rangeMapReaderActivity, 0); - - // forward data activity, its input is coming through the operator's in-port = 0 & activity's in-port = 0 - builder.addActivity(this, forwardDataActivity); - builder.addSourceEdge(0, forwardDataActivity, 0); - - // forward data activity will wait for the range map reader activity - builder.addBlockingEdge(rangeMapReaderActivity, forwardDataActivity); - - // data leaves from the operator's out-port = 0 & forward data activity's out-port = 0 - builder.addTargetEdge(0, forwardDataActivity, 0); + @Override + public AbstractActivityNode createSideDataActivity() { + return new RangeMapReaderActivity(new ActivityId(odId, SIDE_DATA_ACTIVITY_ID)); } /** @@ -221,9 +203,9 @@ public void open() throws HyracksDataException { // retrieve the range map from the state object (previous activity should have already stored it) // then deposit it into the ctx so that MToN-partition can pick it up - Object stateObjKey = new TaskId(new ActivityId(odId, RANGEMAP_READER_ACTIVITY_ID), partition); + Object stateObjKey = new TaskId(new ActivityId(odId, SIDE_DATA_ACTIVITY_ID), partition); RangeMapState rangeMapState = (RangeMapState) ctx.getStateObject(stateObjKey); - TaskUtil.put(rangeMapKeyInContext, rangeMapState.rangeMap, ctx); + TaskUtil.put(sideDataKey, rangeMapState.rangeMap, ctx); writer.open(); } -- To view, visit https://asterix-gerrit.ics.uci.edu/3338 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-MessageType: merged Gerrit-Change-Id: Icc3db4b386e69a98c2a1c40dadc96eb3e1a5d4fa Gerrit-Change-Number: 3338 Gerrit-PatchSet: 10 Gerrit-Owner: James Fang <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose (1000171) Gerrit-Reviewer: James Fang <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]>
