[email protected] has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2299

Change subject: PLEASE EDIT to provide a meaningful commit message!
......................................................................

PLEASE EDIT to provide a meaningful commit message!

The following commits from your working branch will be included:

commit 6fa8ba4ddfc5cd7745f7f670377bfc8f87563918
Author: Shiva <[email protected]>
Date:   Thu Jan 18 08:38:13 2018 -0800

    State-aware memory calculation

commit 04cb85ebecce27de62022a6647f54a17dc94b8c5
Author: Shiva <[email protected]>
Date:   Thu Sep 21 23:45:15 2017 -0700

    fixed bug

commit 8c96a008b809ef9d0b28ff75d927ea51ed8abe4c
Author: Shiva <[email protected]>
Date:   Tue Sep 19 22:49:58 2017 -0700

    addressed code review

commit a3de58bef92573216774e90f0e55eeb0cd3ab1e8
Author: shivajah <[email protected]>
Date:   Wed Jun 28 08:52:19 2017 -0700

    json added

Change-Id: Ic715c5733621e27049677f44e1ddaa0dd2c71baf
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
M asterixdb/asterix-app/pom.xml
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/StageBasedRequiredCapacityVisitor.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractBinaryJoinOperator.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/PlanPrettyPrinter.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java
14 files changed, 669 insertions(+), 11 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/99/2299/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
index 1160aaa..e44767b 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
@@ -248,4 +248,4 @@
         Boolean value = flags.get(flag);
         return value == null ? false : value.booleanValue();
     }
-}
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index dc00bea..1c0389a 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -283,6 +283,10 @@
   </profiles>
   <dependencies>
     <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-control-cc</artifactId>
     </dependency>
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
index 4ecd978..0e51953 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
@@ -94,8 +94,7 @@
 
     public void compile(boolean optimize, boolean printRewrittenExpressions, 
boolean printLogicalPlan,
             boolean printOptimizedPlan, boolean printPhysicalOpsOnly, boolean 
generateBinaryRuntime, boolean printJob,
-            PlanFormat pformat)
-            throws Exception {
+            PlanFormat pformat) throws Exception {
         queryJobSpec = null;
         dmlJobs = null;
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/StageBasedRequiredCapacityVisitor.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/StageBasedRequiredCapacityVisitor.java
new file mode 100644
index 0000000..a21f263
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/StageBasedRequiredCapacityVisitor.java
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.app.resource;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+
+public class StageBasedRequiredCapacityVisitor implements 
ILogicalOperatorVisitor<Void, Void> {
+
+
+
+    private static final long MAX_BUFFER_PER_CONNECTION = 1L;
+
+    private Stage currentStage;
+    List<Stage> stages;
+    private final long numComputationPartitions;
+    private final long groupByMemorySize;
+    private final long joinMemorySize;
+    private final long sortMemorySize;
+    private final long frameSize;
+    private final IClusterCapacity clusterCapacity;
+    private final Set<ILogicalOperator> visitedOperators = new HashSet<>();
+    private long maxStagebasedMemory = 0L;
+    private int printC = 0;
+
+    public StageBasedRequiredCapacityVisitor(int numComputationPartitions, int 
sortFrameLimit, int groupFrameLimit,
+            int joinFrameLimit, int frameSize, IClusterCapacity 
clusterCapacity) {
+        this.numComputationPartitions = numComputationPartitions;
+        this.frameSize = frameSize;
+        this.groupByMemorySize = groupFrameLimit * (long) frameSize;
+        this.joinMemorySize = joinFrameLimit * (long) frameSize;
+        this.sortMemorySize = sortFrameLimit * (long) frameSize;
+        this.clusterCapacity = clusterCapacity;
+        this.clusterCapacity.setAggregatedCores(1); // At least one core is 
needed.
+        this.stages = new LinkedList<>();
+    }
+
+    @Override
+    public Void visitAggregateOperator(AggregateOperator op, Void arg) throws 
AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitRunningAggregateOperator(RunningAggregateOperator op, 
Void arg) throws AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, 
Void arg) throws AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitGroupByOperator(GroupByOperator op, Void arg) throws 
AlgebricksException {
+        addToStage(op,false);
+        return null;
+    }
+
+    @Override
+    public Void visitLimitOperator(LimitOperator op, Void arg) throws 
AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws 
AlgebricksException {
+        addToStage(op,false);
+        return null;
+    }
+
+    @Override
+    public Void visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) 
throws AlgebricksException {
+        addToStage(op,false);
+        return null;
+    }
+
+    @Override
+    public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, 
Void arg) throws AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitOrderOperator(OrderOperator op, Void arg) throws 
AlgebricksException {
+        addToStage(op,false);
+        return null;
+    }
+
+    @Override
+    public Void visitAssignOperator(AssignOperator op, Void arg) throws 
AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitSelectOperator(SelectOperator op, Void arg) throws 
AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitDelegateOperator(DelegateOperator op, Void arg) throws 
AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitProjectOperator(ProjectOperator op, Void arg) throws 
AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws 
AlgebricksException {
+        // Makes sure that the downstream of a replicate operator is only 
visited once.
+        if (!visitedOperators.contains(op)) {
+            visitedOperators.add(op);
+            addToStage(op, true);
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitSplitOperator(SplitOperator op, Void arg) throws 
AlgebricksException {
+        // Makes sure that the downstream of a split operator is only visited 
once.
+        if (!visitedOperators.contains(op)) {
+            visitedOperators.add(op);
+            addToStage(op, true);
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitMaterializeOperator(MaterializeOperator op, Void arg) 
throws AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitScriptOperator(ScriptOperator op, Void arg) throws 
AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitSubplanOperator(SubplanOperator op, Void arg) throws 
AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitSinkOperator(SinkOperator op, Void arg) throws 
AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitUnionOperator(UnionAllOperator op, Void arg) throws 
AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitIntersectOperator(IntersectOperator op, Void arg) throws 
AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitUnnestOperator(UnnestOperator op, Void arg) throws 
AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Void 
arg) throws AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws 
AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, 
Void arg) throws AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitDataScanOperator(DataSourceScanOperator op, Void arg) 
throws AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitDistinctOperator(DistinctOperator op, Void arg) throws 
AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitExchangeOperator(ExchangeOperator op, Void arg) throws 
AlgebricksException {
+        calculateMemoryUsageForExchange(op);
+        return null;
+    }
+
+    @Override
+    public Void visitWriteOperator(WriteOperator op, Void arg) throws 
AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitDistributeResultOperator(DistributeResultOperator op, 
Void arg) throws AlgebricksException {
+        addToStage(op, true);
+        printResults();
+        return null;
+    }
+
+    @Override
+    public Void visitWriteResultOperator(WriteResultOperator op, Void arg) 
throws AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, 
Void arg) throws AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void 
visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void 
arg)
+            throws AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    @Override
+    public Void visitTokenizeOperator(TokenizeOperator op, Void arg) throws 
AlgebricksException {
+        addToStage(op, true);
+        return null;
+    }
+
+    // Calculates the memory usage for exchange operators.
+    private void calculateMemoryUsageForExchange(ExchangeOperator op) throws 
AlgebricksException {
+        IPhysicalOperator physicalOperator = op.getPhysicalOperator();
+        PhysicalOperatorTag physicalOperatorTag = 
physicalOperator.getOperatorTag();
+        if (physicalOperatorTag == PhysicalOperatorTag.ONE_TO_ONE_EXCHANGE
+                || physicalOperatorTag == 
PhysicalOperatorTag.SORT_MERGE_EXCHANGE) {
+            addOutputBuffer(op);
+        }
+        else {
+            long opMem = 2L * MAX_BUFFER_PER_CONNECTION * 
numComputationPartitions * numComputationPartitions * frameSize;
+            currentStage.addToMemory(opMem);
+            setMaxStageMemory();
+        }
+        addToStage(op, false);
+    }
+
+    // Calculates the cluster-wide memory usage for blocking activities like 
group-by, sort, and join.
+    private void calculateMemoryUsageForBlockingOperators(ILogicalOperator op, 
long memSize)
+            throws AlgebricksException {
+        long opMem;
+        if (op.getExecutionMode() == 
AbstractLogicalOperator.ExecutionMode.PARTITIONED
+                || op.getExecutionMode() == 
AbstractLogicalOperator.ExecutionMode.LOCAL) {
+            opMem = memSize * numComputationPartitions;
+        } else {
+            opMem = memSize;
+        }
+        currentStage.addToMemory(opMem);
+        setMaxStageMemory();
+    }
+
+    // Recursively visits input operators of an operator and sets the CPU core 
usage.
+    private void addToStage(ILogicalOperator op, boolean toAddOutputBuffer) 
throws AlgebricksException {
+        setAvailableCores(op);
+
+        switch(op.getOperatorTag()){//toAddOutputBuffer for blocked operators 
is always false
+            case INNERJOIN:
+            case LEFTOUTERJOIN: {
+                PhasedOperator pOp1 = new PhasedOperator(op, 1);
+                currentStage.getOperators().add(pOp1);
+                calculateMemoryUsageForBlockingOperators(op, joinMemorySize);
+                
op.getInputs().get(op.getPhaseToInput().get(1)).getValue().accept(this, null);
+                PhasedOperator pOp0 = new PhasedOperator(op, 0);
+                createStage(pOp0);
+                calculateMemoryUsageForBlockingOperators(op, joinMemorySize);
+                
op.getInputs().get(op.getPhaseToInput().get(0)).getValue().accept(this, null);
+                break;
+            }
+            case GROUP: {
+                addToStageOrderGroupby(op, groupByMemorySize);
+                break;
+            }
+            case ORDER: {
+                addToStageOrderGroupby(op, sortMemorySize);
+                break;
+            }
+            default: {
+                PhasedOperator pOp = new PhasedOperator(op, 0);
+                if (stages.isEmpty()) {
+                    createStage(pOp);
+                } else {
+                    currentStage.getOperators().add(pOp);
+                }
+                if (toAddOutputBuffer) {
+                    addOutputBuffer(op);
+                }
+                for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
+                    inputOpRef.getValue().accept(this, null);
+                }
+                break;
+            }
+        }
+    }
+
+    private void createStage(PhasedOperator pOp){
+        currentStage = new Stage();
+        currentStage.rootOperator = pOp;
+        currentStage.getOperators().add(pOp);
+        stages.add(currentStage);
+    }
+
+    private void addToStageOrderGroupby(ILogicalOperator op, long 
memory)throws AlgebricksException{
+        PhasedOperator pOp1 = new PhasedOperator(op, 1);
+        currentStage.getOperators().add(pOp1);
+        calculateMemoryUsageForBlockingOperators(op, memory);
+        PhasedOperator pOp0 = new PhasedOperator(op, 0);
+        createStage(pOp0);
+        calculateMemoryUsageForBlockingOperators(op, memory);
+        for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
+            inputOpRef.getValue().accept(this, null);
+        }
+    }
+
+    // Adds output buffer for an operator.
+    private void addOutputBuffer(ILogicalOperator op) {
+        long opMem;
+        if (op.getExecutionMode() == 
AbstractLogicalOperator.ExecutionMode.PARTITIONED
+                || op.getExecutionMode() == 
AbstractLogicalOperator.ExecutionMode.LOCAL) {
+            opMem = frameSize * numComputationPartitions; // every operator 
needs one output buffer.
+        } else {
+            opMem = frameSize; // every operator needs one output buffer.
+        }
+        currentStage.addToMemory(opMem);
+        setMaxStageMemory();
+    }
+
+    // Sets the number of available cores
+    private void setAvailableCores(ILogicalOperator op) {
+        if (op.getExecutionMode() == 
AbstractLogicalOperator.ExecutionMode.PARTITIONED
+                || op.getExecutionMode() == 
AbstractLogicalOperator.ExecutionMode.LOCAL) {
+            clusterCapacity.setAggregatedCores((int) numComputationPartitions);
+        }
+    }
+    private void setMaxStageMemory(){
+        if (currentStage.getRequiredMemory() > maxStagebasedMemory) {
+            maxStagebasedMemory = currentStage.getRequiredMemory();
+            
clusterCapacity.setMaxMemoryOverStagesByteSize(maxStagebasedMemory);
+        }
+    }
+
+    public class Stage {
+        private long requiredMemory = 0L;
+        private List<PhasedOperator> operators;
+
+
+        private PhasedOperator rootOperator;
+
+        public long getRequiredMemory() {
+            return requiredMemory;
+        }
+        public Stage(){
+            operators =  new LinkedList<>();
+        }
+
+        public List<PhasedOperator> getOperators() {
+            return operators;
+        }
+
+        public void setOperators(List<PhasedOperator> operators) {
+            this.operators = operators;
+        }
+
+        public void addToMemory(long opMem){
+            this.requiredMemory += opMem;
+        }
+        public PhasedOperator getRootOperator() {
+            return rootOperator;
+        }
+
+    }
+
+    public class PhasedOperator{
+        ILogicalOperator op;
+        int phase;
+        public PhasedOperator(ILogicalOperator op, int phase){
+            this.op = op;
+            this.phase = phase;
+        }
+    }
+
+    public void printResults(){
+        String FILENAME = "/tmp/memoryData.txt";
+        BufferedWriter bw = null;
+        FileWriter fw = null;
+        String data="";
+        try {
+            if (printC == 0) {
+                 data = "Stages: ";
+                String stagesStr = "";
+                for (Stage s : stages) {
+                    stagesStr +=
+                            "Root operator: " + 
s.getRootOperator().op.getOperatorTag() + ":" + s.getRootOperator().phase + 
",\n";
+                    stagesStr += "Operators: ";
+                    int i = 0;
+                    for (PhasedOperator op : s.getOperators()) {
+                        if (i > 0)
+                            stagesStr += ",";
+                        stagesStr += "{ ";
+                        stagesStr += (op.op.getOperatorTag() == 
LogicalOperatorTag.EXCHANGE) ?
+                                ((ExchangeOperator) 
op.op).getPhysicalOperator().getOperatorTag() :
+                                op.op.getOperatorTag();
+                        stagesStr += ":" + op.phase + "(" + 
op.op.getExecutionMode() + ")}";
+                        i++;
+                    }
+                    stagesStr += "\n";
+                    stagesStr += "req-mem: " + s.requiredMemory + "\n =====\n";
+                }
+                data += stagesStr;
+                printC++;
+            }
+            data += "#Cores: "+clusterCapacity.getAggregatedCores()+
+                    ",\t Conservative memory(bytes): 
"+clusterCapacity.getAggregatedMemoryByteSize()+",\t Stage-aware memory(bytes) 
: "+clusterCapacity.getMaxMemoryOverStagesByteSize() +
+                    "\t Conservative/stage-aware : "+ 
clusterCapacity.getAggregatedMemoryByteSize()/clusterCapacity.getMaxMemoryOverStagesByteSize()+"\n";
+            File file = new File(FILENAME);
+
+            // if file doesnt exists, then create it
+            if (!file.exists()) {
+                file.createNewFile();
+            }
+
+            // true = append file
+            fw = new FileWriter(file.getAbsoluteFile(), true);
+            bw = new BufferedWriter(fw);
+
+            bw.write(data);
+
+        } catch (IOException e) {
+
+            e.printStackTrace();
+
+        } finally {
+
+            try {
+
+                if (bw != null)
+                    bw.close();
+
+                if (fw != null)
+                    fw.close();
+
+            } catch (IOException ex) {
+                ex.printStackTrace();
+            }
+        }
+    }
+
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
index 61c1dfe..0c11088 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
@@ -20,6 +20,7 @@
 package org.apache.asterix.utils;
 
 import org.apache.asterix.app.resource.RequiredCapacityVisitor;
+import org.apache.asterix.app.resource.StageBasedRequiredCapacityVisitor;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -63,7 +64,11 @@
 
         // There could be only one root operator for a top-level query plan.
         ILogicalOperator rootOp = plan.getRoots().get(0).getValue();
+
+        StageBasedRequiredCapacityVisitor stageVisitor = new 
StageBasedRequiredCapacityVisitor(computationLocations.getLocations().length,
+                sortFrameLimit, groupFrameLimit, joinFrameLimit, frameSize, 
clusterCapacity);
         rootOp.accept(visitor, null);
+        rootOp.accept(stageVisitor,null);
         return clusterCapacity;
     }
 
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
index 707a7db..2c825ad 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
@@ -36,6 +36,8 @@
 
 public interface ILogicalOperator {
 
+    public Map<Integer, Integer> getPhaseToInput();
+
     public LogicalOperatorTag getOperatorTag();
 
     public ExecutionMode getExecutionMode();
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractBinaryJoinOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractBinaryJoinOperator.java
index 8e5ce06..a962a38 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractBinaryJoinOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractBinaryJoinOperator.java
@@ -19,6 +19,8 @@
 package org.apache.hyracks.algebricks.core.algebra.operators.logical;
 
 import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
 
 import org.apache.commons.lang3.mutable.Mutable;
 
@@ -33,6 +35,12 @@
     protected final Mutable<ILogicalExpression> condition;
     protected JoinKind joinKind;
 
+    public Map<Integer, Integer> getPhaseToInput() {
+        return phaseToInput;
+    }
+
+    protected Map<Integer,Integer> phaseToInput;
+
     public enum JoinKind {
         INNER,
         LEFT_OUTER
@@ -41,6 +49,9 @@
     public AbstractBinaryJoinOperator(JoinKind joinKind, 
Mutable<ILogicalExpression> condition) {
         this.joinKind = joinKind;
         this.condition = condition;
+        this.phaseToInput = new HashMap<>();
+        phaseToInput.put(0,1);
+        phaseToInput.put(1,0);
     }
 
     public AbstractBinaryJoinOperator(JoinKind joinKind, 
Mutable<ILogicalExpression> condition,
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
index 4686f32..8f6d28c 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
@@ -18,10 +18,7 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.logical;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -60,10 +57,11 @@
          */
         LOCAL
     }
+    Map<Integer,Integer> phaseToInput = new HashMap<>();
 
     private AbstractLogicalOperator.ExecutionMode mode = 
AbstractLogicalOperator.ExecutionMode.UNPARTITIONED;
     protected IPhysicalOperator physicalOperator;
-    private final Map<String, Object> annotations = new HashMap<>();
+    private final Map<String, Object> annotations = new 
IdentityHashMap<String, Object>();
     private boolean bJobGenEnabled = true;
 
     protected final List<Mutable<ILogicalOperator>> inputs;
@@ -74,6 +72,9 @@
     }
 
     @Override
+    public Map<Integer,Integer> getPhaseToInput(){ return phaseToInput; }
+
+    @Override
     public abstract LogicalOperatorTag getOperatorTag();
 
     @Override
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index 4283198..9af89b5 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -683,4 +683,4 @@
         }
         buffer.append("]");
     }
-}
+}
\ No newline at end of file
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/PlanPrettyPrinter.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/PlanPrettyPrinter.java
index 67640c5..f17ec10 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/PlanPrettyPrinter.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/PlanPrettyPrinter.java
@@ -37,4 +37,4 @@
             throws AlgebricksException {
         AbstractLogicalOperatorPrettyPrintVisitor.printPhysicalOps(plan, out, 
indent);
     }
-}
+}
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
index cd6362f..4ced5a6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
@@ -18,6 +18,10 @@
  */
 package org.apache.hyracks.api.client.impl;
 
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -25,6 +29,7 @@
 import java.util.Map;
 import java.util.Set;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.IActivity;
@@ -149,6 +154,8 @@
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug(acg.toJSON().asText());
         }
+        printResults(acg);
+
         return acg;
     }
 
@@ -169,4 +176,42 @@
             eqSetMap.put(t, mergedSet);
         }
     }
+
+    private void printResults(ActivityClusterGraph acg ){
+        String FILENAME = "/tmp/hyracks_stages.txt";
+        BufferedWriter bw = null;
+        FileWriter fw = null;
+        try {
+            File file = new File(FILENAME);
+
+            // if file doesnt exists, then create it
+            if (!file.exists()) {
+                file.createNewFile();
+            }
+
+            // true = append file
+            fw = new FileWriter(file.getAbsoluteFile(), true);
+            bw = new BufferedWriter(fw);
+
+            bw.write(new 
ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(acg.toJSON()));
+
+        } catch (IOException e) {
+
+            e.printStackTrace();
+
+        } finally {
+
+            try {
+
+                if (bw != null)
+                    bw.close();
+
+                if (fw != null)
+                    fw.close();
+
+            } catch (IOException ex) {
+                ex.printStackTrace();
+            }
+        }
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index 9f66080..82cdfe0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -28,6 +28,7 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.constraints.Constraint;
 import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
@@ -371,12 +372,38 @@
                         }
                     }
                 }
+                int i = 0;
+                ObjectNode inp = om.createObjectNode();
+                if(!opInputMap.isEmpty() && opInputMap.containsKey(key)) {
+                    Pair<Pair<IOperatorDescriptor, Integer>, 
Pair<IOperatorDescriptor, Integer>> connection;
+                    if (opInputMap.get(key).size()==2){
+                       connection = 
connectorOpMap.get(opInputMap.get(key).get(1).getConnectorId());
+                        if (connection != null) {
+                            inp.put("build-input", 
connection.getLeft().getLeft().getOperatorId().toString());
+                        }
+                        connection = 
connectorOpMap.get(opInputMap.get(key).get(0).getConnectorId());
+                        if (connection != null) {
+                            inp.put("probe-input", 
connection.getLeft().getLeft().getOperatorId().toString());
+                        }
+                    }
+                   else{
+                        connection = 
connectorOpMap.get(opInputMap.get(key).get(0).getConnectorId());
+                        if (connection != null) {
+                            inp.put("input", 
connection.getLeft().getLeft().getOperatorId().toString());
+                        }
+                    }
+                }
                 if (pleObject.size() > 0) {
                     pcObject.set("location", pleObject);
                 }
+
                 if (pcObject.size() > 0) {
                     op.set("partition-constraints", pcObject);
                 }
+
+                if(inp.size() > 0) {
+                    op.set("inputs", inp);
+                }
             }
             jopArray.add(op);
         });
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java
index 25e3255..e9dc2b8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java
@@ -28,11 +28,17 @@
 
 public class ClusterCapacity implements IClusterCapacity {
 
+
+
+    private long maxMemoryOverStagesByteSize = 0L;
     private long aggregatedMemoryByteSize = 0;
     private int aggregatedCores = 0;
     private final Map<String, Long> nodeMemoryMap = new HashMap<>();
     private final Map<String, Integer> nodeCoreMap = new HashMap<>();
-
+    @Override
+    public long getMaxMemoryOverStagesByteSize() {
+        return maxMemoryOverStagesByteSize;
+    }
     @Override
     public long getAggregatedMemoryByteSize() {
         return aggregatedMemoryByteSize;
@@ -70,6 +76,11 @@
     }
 
     @Override
+    public void setMaxMemoryOverStagesByteSize(long 
maxMemoryOverStagesByteSize) {
+        this.maxMemoryOverStagesByteSize = maxMemoryOverStagesByteSize;
+    }
+
+    @Override
     public void setMemoryByteSize(String nodeId, long memoryByteSize) {
         nodeMemoryMap.put(nodeId, memoryByteSize);
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java
index ac3261d..b848a7f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IClusterCapacity.java
@@ -25,6 +25,7 @@
  * This interface abstracts the mutable capacity for a cluster.
  */
 public interface IClusterCapacity extends IReadOnlyClusterCapacity {
+    public long getMaxMemoryOverStagesByteSize();
 
     /**
      * Sets the aggregated memory size for a cluster.
@@ -35,6 +36,14 @@
     void setAggregatedMemoryByteSize(long aggregatedMemoryByteSize);
 
     /**
+     * Sets the maximum memory size needed over all stages for a cluster.
+     *
+     * @param maxMemoryOverStagesByteSize,
+     *            the maximum memory size needed over all stages.
+     */
+    void setMaxMemoryOverStagesByteSize(long maxMemoryOverStagesByteSize);
+
+    /**
      * Sets the aggregated number of CPU cores for a cluster.
      *
      * @param aggregatedCores,

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2299
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic715c5733621e27049677f44e1ddaa0dd2c71baf
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: [email protected]

Reply via email to