Repository: asterixdb Updated Branches: refs/heads/master 5aa29b86f -> b2abe1e90
[ASTERIXDB-2263][RT] Use Plan Stages To Estimate Resources - user model changes: no - storage format changes: no - interface changes: no Details: - Introduce PlanStagesGenerator that generates plan stages using blocking/two-phased operators. - Introduce OperatorResourcesComputer that calculates the estimated resources for any logical operator. - Estimate jobs required resources based on the stage that requires most resources in the plan. Change-Id: Ic715c5733621e27049677f44e1ddaa0dd2c71baf Reviewed-on: https://asterix-gerrit.ics.uci.edu/2299 Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Till Westmann <ti...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/b2abe1e9 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/b2abe1e9 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/b2abe1e9 Branch: refs/heads/master Commit: b2abe1e90ee87d49265d696562f91aab6356315a Parents: 5aa29b8 Author: Murtadha Hubail <mhub...@apache.org> Authored: Tue Jan 30 02:55:08 2018 +0300 Committer: Murtadha Hubail <mhub...@apache.org> Committed: Tue Jan 30 04:27:45 2018 -0800 ---------------------------------------------------------------------- .../app/resource/OperatorResourcesComputer.java | 120 ++++++ .../apache/asterix/app/resource/PlanStage.java | 52 +++ .../app/resource/PlanStagesGenerator.java | 427 +++++++++++++++++++ .../org/apache/asterix/utils/ResourceUtils.java | 36 +- .../app/resource/PlanStagesGeneratorTest.java | 307 +++++++++++++ .../logical/AbstractReplicateOperator.java | 8 + 6 files changed, 941 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2abe1e9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java new file mode 100644 index 0000000..7eaaa3d --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.resource; + +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; + +public class OperatorResourcesComputer { + + public static final int MIN_OPERATOR_CORES = 1; + private static final long MAX_BUFFER_PER_CONNECTION = 1L; + + private final int numComputationPartitions; + private final long groupByMemorySize; + private final long joinMemorySize; + private final long sortMemorySize; + private final long frameSize; + + public OperatorResourcesComputer(int numComputationPartitions, int sortFrameLimit, int groupFrameLimit, + int joinFrameLimit, long frameSize) { + this.numComputationPartitions = numComputationPartitions; + this.groupByMemorySize = groupFrameLimit * frameSize; + this.joinMemorySize = joinFrameLimit * frameSize; + this.sortMemorySize = sortFrameLimit * frameSize; + this.frameSize = frameSize; + } + + public int getOperatorRequiredCores(ILogicalOperator operator) { + if (operator.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED + || operator.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.LOCAL) { + return numComputationPartitions; + } + return MIN_OPERATOR_CORES; + } + + public long getOperatorRequiredMemory(ILogicalOperator operator) { + switch (operator.getOperatorTag()) { + case AGGREGATE: + case ASSIGN: + case DATASOURCESCAN: + case DISTINCT: + case DISTRIBUTE_RESULT: + case EMPTYTUPLESOURCE: + case DELEGATE_OPERATOR: + case EXTERNAL_LOOKUP: + case LEFT_OUTER_UNNEST_MAP: + case LIMIT: + case MATERIALIZE: + case NESTEDTUPLESOURCE: + case PROJECT: + case REPLICATE: + case RUNNINGAGGREGATE: + case SCRIPT: + case SELECT: + case SINK: + case SPLIT: + case SUBPLAN: + case TOKENIZE: + case UNIONALL: + case UNNEST: + case LEFT_OUTER_UNNEST: + case UNNEST_MAP: + case UPDATE: + case WRITE: + case WRITE_RESULT: + case INDEX_INSERT_DELETE_UPSERT: + case INSERT_DELETE_UPSERT: + case INTERSECT: + return getOperatorRequiredMemory(operator, frameSize); + case EXCHANGE: + return getExchangeRequiredMemory((ExchangeOperator) operator); + case GROUP: + return getOperatorRequiredMemory(operator, groupByMemorySize); + case ORDER: + return getOperatorRequiredMemory(operator, sortMemorySize); + case INNERJOIN: + case LEFTOUTERJOIN: + return getOperatorRequiredMemory(operator, joinMemorySize); + default: + throw new IllegalStateException("Unrecognized operator: " + operator.getOperatorTag()); + } + } + + private long getOperatorRequiredMemory(ILogicalOperator op, long memorySize) { + if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED + || op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.LOCAL) { + return memorySize * numComputationPartitions; + } + return memorySize; + } + + private long getExchangeRequiredMemory(ExchangeOperator op) { + final IPhysicalOperator physicalOperator = op.getPhysicalOperator(); + final PhysicalOperatorTag physicalOperatorTag = physicalOperator.getOperatorTag(); + if (physicalOperatorTag == PhysicalOperatorTag.ONE_TO_ONE_EXCHANGE + || physicalOperatorTag == PhysicalOperatorTag.SORT_MERGE_EXCHANGE) { + return getOperatorRequiredMemory(op, frameSize); + } + return 2L * MAX_BUFFER_PER_CONNECTION * numComputationPartitions * numComputationPartitions * frameSize; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2abe1e9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStage.java new file mode 100644 index 0000000..1e623c5 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStage.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.resource; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; + +public class PlanStage { + + private final Set<ILogicalOperator> operators = new HashSet<>(); + private final int stageId; + + PlanStage(int stageId) { + this.stageId = stageId; + } + + @Override + public String toString() { + return "Stage{" + "stageId=" + stageId + ", operators(" + operators.size() + ")" + "=" + operators + '}'; + } + + public Set<ILogicalOperator> getOperators() { + return operators; + } + + public long getRequiredMemory(OperatorResourcesComputer resourcesComputer) { + return operators.stream().mapToLong(resourcesComputer::getOperatorRequiredMemory).sum(); + } + + public int getRequiredCores(OperatorResourcesComputer resourcesComputer) { + return operators.stream().mapToInt(resourcesComputer::getOperatorRequiredCores).max() + .orElse(OperatorResourcesComputer.MIN_OPERATOR_CORES); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2abe1e9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStagesGenerator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStagesGenerator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStagesGenerator.java new file mode 100644 index 0000000..8b32375 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStagesGenerator.java @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.resource; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractReplicateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; +import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor; +import org.apache.hyracks.util.annotations.NotThreadSafe; + +@NotThreadSafe +public class PlanStagesGenerator implements ILogicalOperatorVisitor<Void, Void> { + + private static final int JOIN_FIRST_INPUT = 1; + private static final int JOIN_SECOND_INPUT = 2; + private final Set<ILogicalOperator> visitedOperators = new HashSet<>(); + private final LinkedList<ILogicalOperator> pendingBlockingOperators = new LinkedList<>(); + private final List<PlanStage> stages = new ArrayList<>(); + private PlanStage currentStage; + private int stageCounter; + + public PlanStagesGenerator() { + currentStage = new PlanStage(++stageCounter); + stages.add(currentStage); + } + + @Override + public Void visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitRunningAggregateOperator(RunningAggregateOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException { + // Makes sure that the downstream of a replicate operator is only visited once. + if (!visitedOperators.contains(op)) { + visitedOperators.add(op); + visit(op); + } else { + merge(op); + } + return null; + } + + @Override + public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException { + // Makes sure that the downstream of a split operator is only visited once. + if (!visitedOperators.contains(op)) { + visitedOperators.add(op); + visit(op); + } else { + merge(op); + } + return null; + } + + @Override + public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitWriteOperator(WriteOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg) + throws AlgebricksException { + visit(op); + return null; + } + + @Override + public Void visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException { + visit(op); + return null; + } + + public List<PlanStage> getStages() { + return stages; + } + + private void visit(ILogicalOperator op) throws AlgebricksException { + addToStage(op); + if (!pendingBlockingOperators.isEmpty()) { + final ILogicalOperator firstPending = pendingBlockingOperators.pop(); + visitBlocking(firstPending); + } + } + + private void visitBlocking(ILogicalOperator blockingOp) throws AlgebricksException { + final PlanStage blockingOpStage = new PlanStage(++stageCounter); + blockingOpStage.getOperators().add(blockingOp); + stages.add(blockingOpStage); + currentStage = blockingOpStage; + switch (blockingOp.getOperatorTag()) { + case INNERJOIN: + case LEFTOUTERJOIN: + // visit only the second input + ILogicalOperator joinSecondInput = getJoinOperatorInput(blockingOp, JOIN_SECOND_INPUT); + joinSecondInput.accept(this, null); + break; + case GROUP: + case ORDER: + visitInputs(blockingOp); + break; + default: + throw new IllegalStateException("Unrecognized blocking operator: " + blockingOp.getOperatorTag()); + } + } + + private void addToStage(ILogicalOperator op) throws AlgebricksException { + currentStage.getOperators().add(op); + switch (op.getOperatorTag()) { + case INNERJOIN: + case LEFTOUTERJOIN: + pendingBlockingOperators.add(op); + // continue on the same stage + final ILogicalOperator joinFirstInput = getJoinOperatorInput(op, JOIN_FIRST_INPUT); + joinFirstInput.accept(this, null); + break; + case GROUP: + if (isBlockingGroupBy((GroupByOperator) op)) { + pendingBlockingOperators.add(op); + return; + } + // continue on the same stage + visitInputs(op); + break; + case ORDER: + pendingBlockingOperators.add(op); + break; + default: + visitInputs(op); + break; + } + } + + private void visitInputs(ILogicalOperator op) throws AlgebricksException { + if (isMaterialized(op)) { + // don't visit the inputs of this operator since it is supposed to be blocking due to materialization. + // some other non-blocking operator will visit those inputs when reached. + return; + } + for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) { + inputOpRef.getValue().accept(this, null); + } + } + + private boolean isBlockingGroupBy(GroupByOperator op) { + return op.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.EXTERNAL_GROUP_BY + || op.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.SORT_GROUP_BY; + } + + /** + * Checks whether the operator {@code op} is supposed to be materialized + * due to a replicate/split operators. + * + * @param op + * @return true if the operator will be materialized. Otherwise false + */ + private boolean isMaterialized(ILogicalOperator op) { + for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) { + final ILogicalOperator inputOp = inputOpRef.getValue(); + final LogicalOperatorTag inputOpTag = inputOp.getOperatorTag(); + if (inputOpTag == LogicalOperatorTag.REPLICATE || inputOpTag == LogicalOperatorTag.SPLIT) { + final AbstractReplicateOperator replicateOperator = (AbstractReplicateOperator) inputOp; + if (replicateOperator.isMaterialized(op)) { + return true; + } + } + } + return false; + } + + private ILogicalOperator getJoinOperatorInput(ILogicalOperator op, int inputNum) { + if (inputNum != JOIN_FIRST_INPUT && inputNum != JOIN_SECOND_INPUT) { + throw new IllegalArgumentException("invalid input number for join operator"); + } + final List<Mutable<ILogicalOperator>> inputs = op.getInputs(); + if (inputs.size() != 2) { + throw new IllegalStateException("Join must have exactly two inputs. Current inputs: " + inputs.size()); + } + return op.getInputs().get(inputNum - 1).getValue(); + } + + /** + * Merges all operators on the current stage to the stage on which {@code op} appeared. + * + * @param op + */ + private void merge(ILogicalOperator op) { + // all operators in this stage belong to the stage of the already visited op + for (PlanStage stage : stages) { + if (stage != currentStage && stage.getOperators().contains(op)) { + stage.getOperators().addAll(currentStage.getOperators()); + stages.remove(currentStage); + currentStage = stage; + break; + } + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2abe1e9/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java index ccda1e7..0149ffa 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java @@ -19,7 +19,11 @@ package org.apache.asterix.utils; -import org.apache.asterix.app.resource.RequiredCapacityVisitor; +import java.util.List; + +import org.apache.asterix.app.resource.OperatorResourcesComputer; +import org.apache.asterix.app.resource.PlanStage; +import org.apache.asterix.app.resource.PlanStagesGenerator; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; @@ -54,16 +58,30 @@ public class ResourceUtils { final int sortFrameLimit = physicalOptimizationConfig.getMaxFramesExternalSort(); final int groupFrameLimit = physicalOptimizationConfig.getMaxFramesForGroupBy(); final int joinFrameLimit = physicalOptimizationConfig.getMaxFramesForJoin(); + final List<PlanStage> planStages = getStages(plan); + return getStageBasedRequiredCapacity(planStages, computationLocations.getLocations().length, sortFrameLimit, + groupFrameLimit, joinFrameLimit, frameSize); + } - // Creates a cluster capacity visitor. - IClusterCapacity clusterCapacity = new ClusterCapacity(); - RequiredCapacityVisitor visitor = new RequiredCapacityVisitor(computationLocations.getLocations().length, - sortFrameLimit, groupFrameLimit, joinFrameLimit, frameSize, clusterCapacity); - + public static List<PlanStage> getStages(ILogicalPlan plan) throws AlgebricksException { // There could be only one root operator for a top-level query plan. - ILogicalOperator rootOp = plan.getRoots().get(0).getValue(); - rootOp.accept(visitor, null); - return clusterCapacity; + final ILogicalOperator rootOp = plan.getRoots().get(0).getValue(); + final PlanStagesGenerator stagesGenerator = new PlanStagesGenerator(); + rootOp.accept(stagesGenerator, null); + return stagesGenerator.getStages(); } + public static IClusterCapacity getStageBasedRequiredCapacity(List<PlanStage> stages, int computationLocations, + int sortFrameLimit, int groupFrameLimit, int joinFrameLimit, int frameSize) { + final OperatorResourcesComputer computer = new OperatorResourcesComputer(computationLocations, sortFrameLimit, + groupFrameLimit, joinFrameLimit, frameSize); + final IClusterCapacity clusterCapacity = new ClusterCapacity(); + final Long maxRequiredMemory = stages.stream().mapToLong(stage -> stage.getRequiredMemory(computer)).max() + .orElseThrow(IllegalStateException::new); + clusterCapacity.setAggregatedMemoryByteSize(maxRequiredMemory); + final Integer maxRequireCores = stages.stream().mapToInt(stage -> stage.getRequiredCores(computer)).max() + .orElseThrow(IllegalStateException::new); + clusterCapacity.setAggregatedCores(maxRequireCores); + return clusterCapacity; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2abe1e9/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java new file mode 100644 index 0000000..0e55b1e --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.app.resource; + +import static org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag.GROUP; +import static org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag.INNERJOIN; +import static org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag.LEFTOUTERJOIN; +import static org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag.ORDER; +import static org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode.LOCAL; +import static org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode.PARTITIONED; +import static org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode.UNPARTITIONED; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Stream; + +import org.apache.asterix.utils.ResourceUtils; +import org.apache.commons.lang3.mutable.MutableObject; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator; +import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl; +import org.apache.hyracks.api.job.resource.IClusterCapacity; +import org.junit.Assert; +import org.junit.Test; + +public class PlanStagesGeneratorTest { + + private static final Set<LogicalOperatorTag> BLOCKING_OPERATORS = + new HashSet<>(Arrays.asList(INNERJOIN, LEFTOUTERJOIN, ORDER)); + private static final long MEMORY_BUDGET = 33554432L; + private static final int FRAME_SIZE = 32768; + private static final int FRAME_LIMIT = (int) (MEMORY_BUDGET / FRAME_SIZE); + private static final int PARALLELISM = 10; + private static final long MAX_BUFFER_PER_CONNECTION = 1L; + + @Test + public void noBlockingPlan() throws AlgebricksException { + EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator(); + ets.setExecutionMode(UNPARTITIONED); + + AssignOperator assignOperator = new AssignOperator(Collections.emptyList(), null); + assignOperator.setExecutionMode(UNPARTITIONED); + assignOperator.getInputs().add(new MutableObject<>(ets)); + + ExchangeOperator exchange = new ExchangeOperator(); + exchange.setExecutionMode(UNPARTITIONED); + exchange.setPhysicalOperator(new OneToOneExchangePOperator()); + exchange.getInputs().add(new MutableObject<>(assignOperator)); + + DistributeResultOperator resultOperator = new DistributeResultOperator(null, null); + resultOperator.setExecutionMode(UNPARTITIONED); + resultOperator.getInputs().add(new MutableObject<>(exchange)); + ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator))); + + List<PlanStage> stages = ResourceUtils.getStages(plan); + // ensure a single stage plan + final int expectedStages = 1; + Assert.assertEquals(expectedStages, stages.size()); + validateStages(stages, resultOperator, exchange, ets, assignOperator); + // frame size for every operator + final long expectedMemory = stages.get(0).getOperators().size() * FRAME_SIZE; + assertRequiredMemory(stages, expectedMemory); + } + + @Test + public void testNonBlockingGroupByOrderBy() throws AlgebricksException { + EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator(); + ets.setExecutionMode(PARTITIONED); + + DataSourceScanOperator scanOperator = new DataSourceScanOperator(Collections.emptyList(), null); + scanOperator.setExecutionMode(PARTITIONED); + scanOperator.getInputs().add(new MutableObject<>(ets)); + + ExchangeOperator exchange = new ExchangeOperator(); + exchange.setExecutionMode(PARTITIONED); + exchange.setPhysicalOperator(new OneToOneExchangePOperator()); + exchange.getInputs().add(new MutableObject<>(scanOperator)); + + GroupByOperator groupByOperator = new GroupByOperator(); + groupByOperator.setExecutionMode(PARTITIONED); + groupByOperator + .setPhysicalOperator(new PreclusteredGroupByPOperator(Collections.emptyList(), true, FRAME_LIMIT)); + groupByOperator.getInputs().add(new MutableObject<>(exchange)); + + OrderOperator orderOperator = new OrderOperator(); + orderOperator.setExecutionMode(PARTITIONED); + orderOperator.getInputs().add(new MutableObject<>(groupByOperator)); + + DistributeResultOperator resultOperator = new DistributeResultOperator(null, null); + resultOperator.setExecutionMode(PARTITIONED); + resultOperator.getInputs().add(new MutableObject<>(orderOperator)); + ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator))); + + final List<PlanStage> stages = ResourceUtils.getStages(plan); + validateStages(stages, ets, exchange, groupByOperator, orderOperator, resultOperator); + // ensure 3 stage (root to order, order to group by, group by to ets) + final int expectedStages = 2; + Assert.assertEquals(expectedStages, stages.size()); + + // dominating stage should have orderBy, orderBy's input (groupby), groupby's input (exchange), + // exchange's input (scanOperator), and scanOperator's input (ets) + long orderOperatorRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM; + long groupByOperatorRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM; + long exchangeRequiredMemory = PARALLELISM * FRAME_SIZE; + long scanOperatorRequiredMemory = PARALLELISM * FRAME_SIZE; + long etsRequiredMemory = FRAME_SIZE * PARALLELISM; + + final long expectedMemory = orderOperatorRequiredMemory + groupByOperatorRequiredMemory + exchangeRequiredMemory + + scanOperatorRequiredMemory + etsRequiredMemory; + assertRequiredMemory(stages, expectedMemory); + } + + @Test + public void testJoinGroupby() throws AlgebricksException { + EmptyTupleSourceOperator ets1 = new EmptyTupleSourceOperator(); + ets1.setExecutionMode(PARTITIONED); + + DataSourceScanOperator scanOperator1 = new DataSourceScanOperator(Collections.emptyList(), null); + scanOperator1.setExecutionMode(PARTITIONED); + scanOperator1.getInputs().add(new MutableObject<>(ets1)); + + EmptyTupleSourceOperator ets2 = new EmptyTupleSourceOperator(); + ets1.setExecutionMode(PARTITIONED); + + DataSourceScanOperator scanOperator2 = new DataSourceScanOperator(Collections.emptyList(), null); + scanOperator2.setExecutionMode(PARTITIONED); + scanOperator2.getInputs().add(new MutableObject<>(ets2)); + + InnerJoinOperator firstJoin = new InnerJoinOperator(new MutableObject<>(ConstantExpression.TRUE)); + firstJoin.setExecutionMode(PARTITIONED); + firstJoin.getInputs().add(new MutableObject<>(scanOperator1)); + firstJoin.getInputs().add(new MutableObject<>(scanOperator2)); + + ExchangeOperator exchangeOperator1 = new ExchangeOperator(); + exchangeOperator1.setExecutionMode(PARTITIONED); + exchangeOperator1.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null)); + exchangeOperator1.getInputs().add(new MutableObject<>(firstJoin)); + + EmptyTupleSourceOperator ets3 = new EmptyTupleSourceOperator(); + ets1.setExecutionMode(PARTITIONED); + + GroupByOperator groupByOperator = new GroupByOperator(); + groupByOperator + .setPhysicalOperator(new ExternalGroupByPOperator(Collections.emptyList(), FRAME_LIMIT, FRAME_LIMIT)); + groupByOperator.setExecutionMode(LOCAL); + groupByOperator.getInputs().add(new MutableObject<>(ets3)); + + ExchangeOperator exchangeOperator2 = new ExchangeOperator(); + exchangeOperator2.setExecutionMode(PARTITIONED); + exchangeOperator2.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null)); + exchangeOperator2.getInputs().add(new MutableObject<>(groupByOperator)); + + LeftOuterJoinOperator secondJoin = new LeftOuterJoinOperator(new MutableObject<>(ConstantExpression.TRUE)); + secondJoin.setExecutionMode(PARTITIONED); + secondJoin.getInputs().add(new MutableObject<>(exchangeOperator1)); + secondJoin.getInputs().add(new MutableObject<>(exchangeOperator2)); + + DistributeResultOperator resultOperator = new DistributeResultOperator(null, null); + resultOperator.setExecutionMode(PARTITIONED); + resultOperator.getInputs().add(new MutableObject<>(secondJoin)); + ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator))); + + List<PlanStage> stages = ResourceUtils.getStages(plan); + final int expectedStages = 4; + Assert.assertEquals(expectedStages, stages.size()); + validateStages(stages, ets1, scanOperator1, ets2, scanOperator2, firstJoin, exchangeOperator1, ets3, + groupByOperator, exchangeOperator2, secondJoin, resultOperator); + + // dominating stage should have the following operators: + // resultOperator, its input (secondJoin), secondJoin's first input (exchangeOperator1), exchangeOperator1's + // input (firstJoin), firstJoin's first input (scanOperator1), and scanOperator1's input (ets1) + long resultOperatorRequiredMemory = FRAME_SIZE * PARALLELISM; + long secondJoinRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM; + long exchangeOperator1RequiredMemory = 2 * MAX_BUFFER_PER_CONNECTION * PARALLELISM * PARALLELISM * FRAME_SIZE; + long firstJoinRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM; + long scanOperator1RequiredMemory = FRAME_SIZE * PARALLELISM; + long ets1RequiredMemory = FRAME_SIZE * PARALLELISM; + + long expectedMemory = resultOperatorRequiredMemory + secondJoinRequiredMemory + exchangeOperator1RequiredMemory + + firstJoinRequiredMemory + scanOperator1RequiredMemory + ets1RequiredMemory; + assertRequiredMemory(stages, expectedMemory); + } + + @Test + public void testReplicateSortJoin() throws AlgebricksException { + EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator(); + ets.setExecutionMode(PARTITIONED); + + DataSourceScanOperator scanOperator = new DataSourceScanOperator(Collections.emptyList(), null); + scanOperator.setExecutionMode(PARTITIONED); + scanOperator.getInputs().add(new MutableObject<>(ets)); + + ReplicateOperator replicateOperator = new ReplicateOperator(2); + replicateOperator.setExecutionMode(PARTITIONED); + replicateOperator.getInputs().add(new MutableObject<>(scanOperator)); + + OrderOperator order1 = new OrderOperator(); + order1.setExecutionMode(PARTITIONED); + order1.setPhysicalOperator(new OneToOneExchangePOperator()); + order1.getInputs().add(new MutableObject<>(replicateOperator)); + + OrderOperator order2 = new OrderOperator(); + order2.setExecutionMode(PARTITIONED); + order2.setPhysicalOperator(new OneToOneExchangePOperator()); + order2.getInputs().add(new MutableObject<>(replicateOperator)); + + LeftOuterJoinOperator secondJoin = new LeftOuterJoinOperator(new MutableObject<>(ConstantExpression.TRUE)); + secondJoin.setExecutionMode(PARTITIONED); + secondJoin.getInputs().add(new MutableObject<>(order1)); + secondJoin.getInputs().add(new MutableObject<>(order2)); + + DistributeResultOperator resultOperator = new DistributeResultOperator(null, null); + resultOperator.setExecutionMode(PARTITIONED); + resultOperator.getInputs().add(new MutableObject<>(secondJoin)); + ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator))); + + List<PlanStage> stages = ResourceUtils.getStages(plan); + final int expectedStages = 3; + Assert.assertEquals(expectedStages, stages.size()); + validateStages(stages); + + // dominating stage should have the following operators: + // secondJoin, secondJoin's second input (order2), order2's input (replicate), + // replicate's input (scanOperator), scanOperator's input (ets) + long secondJoinRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM; + long order2RequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM; + long replicateOperatorRequiredMemory = FRAME_SIZE * PARALLELISM; + long scanOperator1RequiredMemory = FRAME_SIZE * PARALLELISM; + long etsRequiredMemory = FRAME_SIZE * PARALLELISM; + long expectedMemory = secondJoinRequiredMemory + order2RequiredMemory + replicateOperatorRequiredMemory + + scanOperator1RequiredMemory + etsRequiredMemory; + assertRequiredMemory(stages, expectedMemory); + } + + private void validateStages(List<PlanStage> stages, ILogicalOperator... operators) { + // ensure all operators appear + Stream.of(operators).forEach(op -> ensureOperatorExists(stages, op)); + // ensure the correct count + for (PlanStage stage : stages) { + stage.getOperators().forEach(op -> validateOperatorStages(stages, op)); + } + } + + private void ensureOperatorExists(List<PlanStage> stages, ILogicalOperator operator) { + final long actual = stages.stream().map(PlanStage::getOperators).filter(op -> op.contains(operator)).count(); + Assert.assertTrue(actual > 0); + } + + private void validateOperatorStages(List<PlanStage> stages, ILogicalOperator operator) { + if (stages.size() == 1) { + return; + } + long expectedAppearances = BLOCKING_OPERATORS.contains(operator.getOperatorTag()) ? 2 : 1; + if (operator.getOperatorTag() == GROUP) { + GroupByOperator groupByOperator = (GroupByOperator) operator; + if (groupByOperator.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.EXTERNAL_GROUP_BY + || groupByOperator.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.SORT_GROUP_BY) { + expectedAppearances = 2; + } + } + final long actual = stages.stream().map(PlanStage::getOperators).filter(op -> op.contains(operator)).count(); + Assert.assertEquals(expectedAppearances, actual); + } + + private void assertRequiredMemory(List<PlanStage> stages, long expectedMemory) { + final IClusterCapacity clusterCapacity = ResourceUtils.getStageBasedRequiredCapacity(stages, PARALLELISM, + FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_SIZE); + Assert.assertEquals(clusterCapacity.getAggregatedMemoryByteSize(), expectedMemory); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2abe1e9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java index 852c392..3bb0f47 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java @@ -109,4 +109,12 @@ public abstract class AbstractReplicateOperator extends AbstractLogicalOperator return false; } + public boolean isMaterialized(ILogicalOperator op) { + for (int i = 0; i < outputs.size(); i++) { + if (outputs.get(i).getValue() == op) { + return outputMaterializationFlags[i]; + } + } + return false; + } }