[email protected] has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2299
Change subject: PLEASE EDIT to provide a meaningful commit message! ...................................................................... PLEASE EDIT to provide a meaningful commit message! The following commits from your working branch will be included: commit 6fa8ba4ddfc5cd7745f7f670377bfc8f87563918 Author: Shiva <[email protected]> Date: Thu Jan 18 08:38:13 2018 -0800 State-aware memory calculation commit 04cb85ebecce27de62022a6647f54a17dc94b8c5 Author: Shiva <[email protected]> Date: Thu Sep 21 23:45:15 2017 -0700 fixed bug commit 8c96a008b809ef9d0b28ff75d927ea51ed8abe4c Author: Shiva <[email protected]> Date: Tue Sep 19 22:49:58 2017 -0700 addressed code review commit a3de58bef92573216774e90f0e55eeb0cd3ab1e8 Author: shivajah <[email protected]> Date: Wed Jun 28 08:52:19 2017 -0700 json added Change-Id: Ic715c5733621e27049677f44e1ddaa0dd2c71baf --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java M asterixdb/asterix-app/pom.xml M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/StageBasedRequiredCapacityVisitor.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractBinaryJoinOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/PlanPrettyPrinter.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java 14 files changed, 669 insertions(+), 11 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/99/2299/1 diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java index 1160aaa..e44767b 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java @@ -248,4 +248,4 @@ Boolean value = flags.get(flag); return value == null ? false : value.booleanValue(); } -} +} \ No newline at end of file diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml index dc00bea..1c0389a 100644 --- a/asterixdb/asterix-app/pom.xml +++ b/asterixdb/asterix-app/pom.xml @@ -283,6 +283,10 @@ </profiles> <dependencies> <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + <dependency> <groupId>org.apache.hyracks</groupId> <artifactId>hyracks-control-cc</artifactId> </dependency> diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java index 4ecd978..0e51953 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java @@ -94,8 +94,7 @@ public void compile(boolean optimize, boolean printRewrittenExpressions, boolean printLogicalPlan, boolean printOptimizedPlan, boolean printPhysicalOpsOnly, boolean generateBinaryRuntime, boolean printJob, - PlanFormat pformat) - throws Exception { + PlanFormat pformat) throws Exception { queryJobSpec = null; dmlJobs = null; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/StageBasedRequiredCapacityVisitor.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/StageBasedRequiredCapacityVisitor.java new file mode 100644 index 0000000..a21f263 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/StageBasedRequiredCapacityVisitor.java @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.app.resource; + +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; + +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; +import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor; +import org.apache.hyracks.api.job.resource.IClusterCapacity; + +public class StageBasedRequiredCapacityVisitor implements ILogicalOperatorVisitor<Void, Void> { + + + + private static final long MAX_BUFFER_PER_CONNECTION = 1L; + + private Stage currentStage; + List<Stage> stages; + private final long numComputationPartitions; + private final long groupByMemorySize; + private final long joinMemorySize; + private final long sortMemorySize; + private final long frameSize; + private final IClusterCapacity clusterCapacity; + private final Set<ILogicalOperator> visitedOperators = new HashSet<>(); + private long maxStagebasedMemory = 0L; + private int printC = 0; + + public StageBasedRequiredCapacityVisitor(int numComputationPartitions, int sortFrameLimit, int groupFrameLimit, + int joinFrameLimit, int frameSize, IClusterCapacity clusterCapacity) { + this.numComputationPartitions = numComputationPartitions; + this.frameSize = frameSize; + this.groupByMemorySize = groupFrameLimit * (long) frameSize; + this.joinMemorySize = joinFrameLimit * (long) frameSize; + this.sortMemorySize = sortFrameLimit * (long) frameSize; + this.clusterCapacity = clusterCapacity; + this.clusterCapacity.setAggregatedCores(1); // At least one core is needed. + this.stages = new LinkedList<>(); + } + + @Override + public Void visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitRunningAggregateOperator(RunningAggregateOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException { + addToStage(op,false); + return null; + } + + @Override + public Void visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException { + addToStage(op,false); + return null; + } + + @Override + public Void visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException { + addToStage(op,false); + return null; + } + + @Override + public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException { + addToStage(op,false); + return null; + } + + @Override + public Void visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException { + // Makes sure that the downstream of a replicate operator is only visited once. + if (!visitedOperators.contains(op)) { + visitedOperators.add(op); + addToStage(op, true); + } + return null; + } + + @Override + public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException { + // Makes sure that the downstream of a split operator is only visited once. + if (!visitedOperators.contains(op)) { + visitedOperators.add(op); + addToStage(op, true); + } + return null; + } + + @Override + public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException { + calculateMemoryUsageForExchange(op); + return null; + } + + @Override + public Void visitWriteOperator(WriteOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + printResults(); + return null; + } + + @Override + public Void visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg) + throws AlgebricksException { + addToStage(op, true); + return null; + } + + @Override + public Void visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException { + addToStage(op, true); + return null; + } + + // Calculates the memory usage for exchange operators. + private void calculateMemoryUsageForExchange(ExchangeOperator op) throws AlgebricksException { + IPhysicalOperator physicalOperator = op.getPhysicalOperator(); + PhysicalOperatorTag physicalOperatorTag = physicalOperator.getOperatorTag(); + if (physicalOperatorTag == PhysicalOperatorTag.ONE_TO_ONE_EXCHANGE + || physicalOperatorTag == PhysicalOperatorTag.SORT_MERGE_EXCHANGE) { + addOutputBuffer(op); + } + else { + long opMem = 2L * MAX_BUFFER_PER_CONNECTION * numComputationPartitions * numComputationPartitions * frameSize; + currentStage.addToMemory(opMem); + setMaxStageMemory(); + } + addToStage(op, false); + } + + // Calculates the cluster-wide memory usage for blocking activities like group-by, sort, and join. + private void calculateMemoryUsageForBlockingOperators(ILogicalOperator op, long memSize) + throws AlgebricksException { + long opMem; + if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED + || op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.LOCAL) { + opMem = memSize * numComputationPartitions; + } else { + opMem = memSize; + } + currentStage.addToMemory(opMem); + setMaxStageMemory(); + } + + // Recursively visits input operators of an operator and sets the CPU core usage. + private void addToStage(ILogicalOperator op, boolean toAddOutputBuffer) throws AlgebricksException { + setAvailableCores(op); + + switch(op.getOperatorTag()){//toAddOutputBuffer for blocked operators is always false + case INNERJOIN: + case LEFTOUTERJOIN: { + PhasedOperator pOp1 = new PhasedOperator(op, 1); + currentStage.getOperators().add(pOp1); + calculateMemoryUsageForBlockingOperators(op, joinMemorySize); + op.getInputs().get(op.getPhaseToInput().get(1)).getValue().accept(this, null); + PhasedOperator pOp0 = new PhasedOperator(op, 0); + createStage(pOp0); + calculateMemoryUsageForBlockingOperators(op, joinMemorySize); + op.getInputs().get(op.getPhaseToInput().get(0)).getValue().accept(this, null); + break; + } + case GROUP: { + addToStageOrderGroupby(op, groupByMemorySize); + break; + } + case ORDER: { + addToStageOrderGroupby(op, sortMemorySize); + break; + } + default: { + PhasedOperator pOp = new PhasedOperator(op, 0); + if (stages.isEmpty()) { + createStage(pOp); + } else { + currentStage.getOperators().add(pOp); + } + if (toAddOutputBuffer) { + addOutputBuffer(op); + } + for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) { + inputOpRef.getValue().accept(this, null); + } + break; + } + } + } + + private void createStage(PhasedOperator pOp){ + currentStage = new Stage(); + currentStage.rootOperator = pOp; + currentStage.getOperators().add(pOp); + stages.add(currentStage); + } + + private void addToStageOrderGroupby(ILogicalOperator op, long memory)throws AlgebricksException{ + PhasedOperator pOp1 = new PhasedOperator(op, 1); + currentStage.getOperators().add(pOp1); + calculateMemoryUsageForBlockingOperators(op, memory); + PhasedOperator pOp0 = new PhasedOperator(op, 0); + createStage(pOp0); + calculateMemoryUsageForBlockingOperators(op, memory); + for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) { + inputOpRef.getValue().accept(this, null); + } + } + + // Adds output buffer for an operator. + private void addOutputBuffer(ILogicalOperator op) { + long opMem; + if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED + || op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.LOCAL) { + opMem = frameSize * numComputationPartitions; // every operator needs one output buffer. + } else { + opMem = frameSize; // every operator needs one output buffer. + } + currentStage.addToMemory(opMem); + setMaxStageMemory(); + } + + // Sets the number of available cores + private void setAvailableCores(ILogicalOperator op) { + if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED + || op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.LOCAL) { + clusterCapacity.setAggregatedCores((int) numComputationPartitions); + } + } + private void setMaxStageMemory(){ + if (currentStage.getRequiredMemory() > maxStagebasedMemory) { + maxStagebasedMemory = currentStage.getRequiredMemory(); + clusterCapacity.setMaxMemoryOverStagesByteSize(maxStagebasedMemory); + } + } + + public class Stage { + private long requiredMemory = 0L; + private List<PhasedOperator> operators; + + + private PhasedOperator rootOperator; + + public long getRequiredMemory() { + return requiredMemory; + } + public Stage(){ + operators = new LinkedList<>(); + } + + public List<PhasedOperator> getOperators() { + return operators; + } + + public void setOperators(List<PhasedOperator> operators) { + this.operators = operators; + } + + public void addToMemory(long opMem){ + this.requiredMemory += opMem; + } + public PhasedOperator getRootOperator() { + return rootOperator; + } + + } + + public class PhasedOperator{ + ILogicalOperator op; + int phase; + public PhasedOperator(ILogicalOperator op, int phase){ + this.op = op; + this.phase = phase; + } + } + + public void printResults(){ + String FILENAME = "/tmp/memoryData.txt"; + BufferedWriter bw = null; + FileWriter fw = null; + String data=""; + try { + if (printC == 0) { + data = "Stages: "; + String stagesStr = ""; + for (Stage s : stages) { + stagesStr += + "Root operator: " + s.getRootOperator().op.getOperatorTag() + ":" + s.getRootOperator().phase + ",\n"; + stagesStr += "Operators: "; + int i = 0; + for (PhasedOperator op : s.getOperators()) { + if (i > 0) + stagesStr += ","; + stagesStr += "{ "; + stagesStr += (op.op.getOperatorTag() == LogicalOperatorTag.EXCHANGE) ? + ((ExchangeOperator) op.op).getPhysicalOperator().getOperatorTag() : + op.op.getOperatorTag(); + stagesStr += ":" + op.phase + "(" + op.op.getExecutionMode() + ")}"; + i++; + } + stagesStr += "\n"; + stagesStr += "req-mem: " + s.requiredMemory + "\n =====\n"; + } + data += stagesStr; + printC++; + } + data += "#Cores: "+clusterCapacity.getAggregatedCores()+ + ",\t Conservative memory(bytes): "+clusterCapacity.getAggregatedMemoryByteSize()+",\t Stage-aware memory(bytes) : "+clusterCapacity.getMaxMemoryOverStagesByteSize() + + "\t Conservative/stage-aware : "+ clusterCapacity.getAggregatedMemoryByteSize()/clusterCapacity.getMaxMemoryOverStagesByteSize()+"\n"; + File file = new File(FILENAME); + + // if file doesnt exists, then create it + if (!file.exists()) { + file.createNewFile(); + } + + // true = append file + fw = new FileWriter(file.getAbsoluteFile(), true); + bw = new BufferedWriter(fw); + + bw.write(data); + + } catch (IOException e) { + + e.printStackTrace(); + + } finally { + + try { + + if (bw != null) + bw.close(); + + if (fw != null) + fw.close(); + + } catch (IOException ex) { + ex.printStackTrace(); + } + } + } + +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java index 61c1dfe..0c11088 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java @@ -20,6 +20,7 @@ package org.apache.asterix.utils; import org.apache.asterix.app.resource.RequiredCapacityVisitor; +import org.apache.asterix.app.resource.StageBasedRequiredCapacityVisitor; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; @@ -63,7 +64,11 @@ // There could be only one root operator for a top-level query plan. ILogicalOperator rootOp = plan.getRoots().get(0).getValue(); + + StageBasedRequiredCapacityVisitor stageVisitor = new StageBasedRequiredCapacityVisitor(computationLocations.getLocations().length, + sortFrameLimit, groupFrameLimit, joinFrameLimit, frameSize, clusterCapacity); rootOp.accept(visitor, null); + rootOp.accept(stageVisitor,null); return clusterCapacity; } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java index 707a7db..2c825ad 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java @@ -36,6 +36,8 @@ public interface ILogicalOperator { + public Map<Integer, Integer> getPhaseToInput(); + public LogicalOperatorTag getOperatorTag(); public ExecutionMode getExecutionMode(); diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractBinaryJoinOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractBinaryJoinOperator.java index 8e5ce06..a962a38 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractBinaryJoinOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractBinaryJoinOperator.java @@ -19,6 +19,8 @@ package org.apache.hyracks.algebricks.core.algebra.operators.logical; import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; import org.apache.commons.lang3.mutable.Mutable; @@ -33,6 +35,12 @@ protected final Mutable<ILogicalExpression> condition; protected JoinKind joinKind; + public Map<Integer, Integer> getPhaseToInput() { + return phaseToInput; + } + + protected Map<Integer,Integer> phaseToInput; + public enum JoinKind { INNER, LEFT_OUTER @@ -41,6 +49,9 @@ public AbstractBinaryJoinOperator(JoinKind joinKind, Mutable<ILogicalExpression> condition) { this.joinKind = joinKind; this.condition = condition; + this.phaseToInput = new HashMap<>(); + phaseToInput.put(0,1); + phaseToInput.put(1,0); } public AbstractBinaryJoinOperator(JoinKind joinKind, Mutable<ILogicalExpression> condition, diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java index 4686f32..8f6d28c 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java @@ -18,10 +18,7 @@ */ package org.apache.hyracks.algebricks.core.algebra.operators.logical; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -60,10 +57,11 @@ */ LOCAL } + Map<Integer,Integer> phaseToInput = new HashMap<>(); private AbstractLogicalOperator.ExecutionMode mode = AbstractLogicalOperator.ExecutionMode.UNPARTITIONED; protected IPhysicalOperator physicalOperator; - private final Map<String, Object> annotations = new HashMap<>(); + private final Map<String, Object> annotations = new IdentityHashMap<String, Object>(); private boolean bJobGenEnabled = true; protected final List<Mutable<ILogicalOperator>> inputs; @@ -74,6 +72,9 @@ } @Override + public Map<Integer,Integer> getPhaseToInput(){ return phaseToInput; } + + @Override public abstract LogicalOperatorTag getOperatorTag(); @Override diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java index 4283198..9af89b5 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java @@ -683,4 +683,4 @@ } buffer.append("]"); } -} +} \ No newline at end of file diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/PlanPrettyPrinter.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/PlanPrettyPrinter.java index 67640c5..f17ec10 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/PlanPrettyPrinter.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/PlanPrettyPrinter.java @@ -37,4 +37,4 @@ throws AlgebricksException { AbstractLogicalOperatorPrettyPrintVisitor.printPhysicalOps(plan, out, indent); } -} +} \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java index cd6362f..4ced5a6 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java @@ -18,6 +18,10 @@ */ package org.apache.hyracks.api.client.impl; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -25,6 +29,7 @@ import java.util.Map; import java.util.Set; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.api.dataflow.ActivityId; import org.apache.hyracks.api.dataflow.IActivity; @@ -149,6 +154,8 @@ if (LOGGER.isDebugEnabled()) { LOGGER.debug(acg.toJSON().asText()); } + printResults(acg); + return acg; } @@ -169,4 +176,42 @@ eqSetMap.put(t, mergedSet); } } + + private void printResults(ActivityClusterGraph acg ){ + String FILENAME = "/tmp/hyracks_stages.txt"; + BufferedWriter bw = null; + FileWriter fw = null; + try { + File file = new File(FILENAME); + + // if file doesnt exists, then create it + if (!file.exists()) { + file.createNewFile(); + } + + // true = append file + fw = new FileWriter(file.getAbsoluteFile(), true); + bw = new BufferedWriter(fw); + + bw.write(new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(acg.toJSON())); + + } catch (IOException e) { + + e.printStackTrace(); + + } finally { + + try { + + if (bw != null) + bw.close(); + + if (fw != null) + fw.close(); + + } catch (IOException ex) { + ex.printStackTrace(); + } + } + } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java index 9f66080..82cdfe0 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; +import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.api.constraints.Constraint; import org.apache.hyracks.api.constraints.expressions.ConstantExpression; @@ -371,12 +372,38 @@ } } } + int i = 0; + ObjectNode inp = om.createObjectNode(); + if(!opInputMap.isEmpty() && opInputMap.containsKey(key)) { + Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connection; + if (opInputMap.get(key).size()==2){ + connection = connectorOpMap.get(opInputMap.get(key).get(1).getConnectorId()); + if (connection != null) { + inp.put("build-input", connection.getLeft().getLeft().getOperatorId().toString()); + } + connection = connectorOpMap.get(opInputMap.get(key).get(0).getConnectorId()); + if (connection != null) { + inp.put("probe-input", connection.getLeft().getLeft().getOperatorId().toString()); + } + } + else{ + connection = connectorOpMap.get(opInputMap.get(key).get(0).getConnectorId()); + if (connection != null) { + inp.put("input", connection.getLeft().getLeft().getOperatorId().toString()); + } + } + } if (pleObject.size() > 0) { pcObject.set("location", pleObject); } + if (pcObject.size() > 0) { op.set("partition-constraints", pcObject); } + + if(inp.size() > 0) { + op.set("inputs", inp); + } } jopArray.add(op); }); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java index 25e3255..e9dc2b8 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java @@ -28,11 +28,17 @@ public class ClusterCapacity implements IClusterCapacity { + + + private long maxMemoryOverStagesByteSize = 0L; private long aggregatedMemoryByteSize = 0; private int aggregatedCores = 0; private final Map<String, Long> nodeMemoryMap = new HashMap<>(); private final Map<String, Integer> nodeCoreMap = new HashMap<>(); - + @Override + public long getMaxMemoryOverStagesByteSize() { + return maxMemoryOverStagesByteSize; + } @Override public long getAggregatedMemoryByteSize() { return aggregatedMemoryByteSize; @@ -70,6 +76,11 @@ } @Override + public void setMaxMemoryOverStagesByteSize(long maxMemoryOverStagesByteSize) { + this.maxMemoryOverStagesByteSize = maxMemoryOverStagesByteSize; + } + + @Override public void setMemoryByteSize(String nodeId, long memoryByteSize) { nodeMemoryMap.put(nodeId, memoryByteSize); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java index ac3261d..b848a7f 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java @@ -25,6 +25,7 @@ * This interface abstracts the mutable capacity for a cluster. */ public interface IClusterCapacity extends IReadOnlyClusterCapacity { + public long getMaxMemoryOverStagesByteSize(); /** * Sets the aggregated memory size for a cluster. @@ -35,6 +36,14 @@ void setAggregatedMemoryByteSize(long aggregatedMemoryByteSize); /** + * Sets the maximum memory size needed over all stages for a cluster. + * + * @param maxMemoryOverStagesByteSize, + * the maximum memory size needed over all stages. + */ + void setMaxMemoryOverStagesByteSize(long maxMemoryOverStagesByteSize); + + /** * Sets the aggregated number of CPU cores for a cluster. * * @param aggregatedCores, -- To view, visit https://asterix-gerrit.ics.uci.edu/2299 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ic715c5733621e27049677f44e1ddaa0dd2c71baf Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: [email protected]
