This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit c64436774387e80fd9b0ff6e9cd7d42c9aa7a961
Author: karthik <kmanivan...@maprtech.com>
AuthorDate: Mon Jun 4 17:00:31 2018 -0700

    DRILL-6594: Data batches for Project operator are not being split properly 
and exceed the maximum specified
    
    This change fixes the incorrect accounting in the case where a columns is 
being projected more than once
    
    closes #1375
---
 .../impl/project/OutputWidthExpression.java        | 17 ++++----
 .../physical/impl/project/OutputWidthVisitor.java  |  2 +-
 .../impl/project/OutputWidthVisitorState.java      |  7 +---
 .../impl/project/ProjectMemoryManager.java         | 48 ++++++++++------------
 .../physical/impl/project/ProjectRecordBatch.java  | 22 +++++-----
 5 files changed, 42 insertions(+), 54 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
index b9240d6..84a3f46 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
@@ -95,30 +95,31 @@ public abstract class OutputWidthExpression {
     }
 
     /**
-     * VarLenReadExpr captures the name of a variable length column that is 
used (read) in an expression.
-     * The captured name will be used to lookup the average entry size for the 
column in the corresponding
+     * VarLenReadExpr captures the inputColumnName and the readExpression used 
to read a variable length column.
+     * The captured inputColumnName will be used to lookup the average entry 
size for the column in the corresponding.
+     * If inputColumnName is null then the readExpression is used to get the 
name of the column.
      * {@link org.apache.drill.exec.record.RecordBatchSizer}
      */
     public static class VarLenReadExpr extends OutputWidthExpression  {
         ValueVectorReadExpression readExpression;
-        String name;
+        String inputColumnName;
 
         public VarLenReadExpr(ValueVectorReadExpression readExpression) {
             this.readExpression = readExpression;
-            this.name = null;
+            this.inputColumnName = null;
         }
 
-        public VarLenReadExpr(String name) {
+        public VarLenReadExpr(String inputColumnName) {
             this.readExpression = null;
-            this.name = name;
+            this.inputColumnName = inputColumnName;
         }
 
         public ValueVectorReadExpression getReadExpression() {
             return readExpression;
         }
 
-        public String getName() {
-            return name;
+        public String getInputColumnName() {
+            return inputColumnName;
         }
 
         @Override
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
index cb58795..70908bf 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
@@ -205,7 +205,7 @@ public class OutputWidthVisitor extends 
AbstractExecExprVisitor<OutputWidthExpre
     @Override
     public OutputWidthExpression visitVarLenReadExpr(VarLenReadExpr 
varLenReadExpr, OutputWidthVisitorState state)
                                                         throws 
RuntimeException {
-        String columnName = varLenReadExpr.getName();
+        String columnName = varLenReadExpr.getInputColumnName();
         if (columnName == null) {
             TypedFieldId fieldId = 
varLenReadExpr.getReadExpression().getTypedFieldId();
             columnName =  TypedFieldId.getPath(fieldId, 
state.manager.getIncomingBatch());
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
index c0e0cb1..e18c827 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
@@ -20,18 +20,13 @@ package org.apache.drill.exec.physical.impl.project;
 public class OutputWidthVisitorState {
 
     ProjectMemoryManager manager;
-    ProjectMemoryManager.OutputColumnType outputColumnType;
 
-    public OutputWidthVisitorState(ProjectMemoryManager manager, 
ProjectMemoryManager.OutputColumnType outputColumnType) {
+    public OutputWidthVisitorState(ProjectMemoryManager manager) {
         this.manager = manager;
-        this.outputColumnType = outputColumnType;
     }
 
     public ProjectMemoryManager getManager() {
         return manager;
     }
 
-    public ProjectMemoryManager.OutputColumnType getOutputColumnType() {
-        return outputColumnType;
-    }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
index f461b09..03c849c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.project;
 
+import com.google.common.base.Preconditions;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -88,15 +89,12 @@ public class ProjectMemoryManager extends 
RecordBatchMemoryManager {
     }
 
     class ColumnWidthInfo {
-        //MaterializedField materializedField;
         OutputWidthExpression outputExpression;
         int width;
         WidthType widthType;
         OutputColumnType outputColumnType;
-        String name;
 
-        ColumnWidthInfo(ValueVector vv,
-                        OutputWidthExpression outputWidthExpression,
+        ColumnWidthInfo(OutputWidthExpression outputWidthExpression,
                         OutputColumnType outputColumnType,
                         WidthType widthType,
                         int fieldWidth) {
@@ -104,8 +102,6 @@ public class ProjectMemoryManager extends 
RecordBatchMemoryManager {
             this.width = fieldWidth;
             this.outputColumnType = outputColumnType;
             this.widthType = widthType;
-            String columnName = vv.getField().getName();
-            this.name = columnName;
         }
 
         public OutputWidthExpression getOutputExpression() { return 
outputExpression; }
@@ -116,7 +112,6 @@ public class ProjectMemoryManager extends 
RecordBatchMemoryManager {
 
         public int getWidth() { return width; }
 
-        public String getName() { return name; }
     }
 
     void ShouldNotReachHere() {
@@ -180,43 +175,44 @@ public class ProjectMemoryManager extends 
RecordBatchMemoryManager {
     }
 
 
-    void addTransferField(ValueVector vvOut, String path) {
-        addField(vvOut, null, OutputColumnType.TRANSFER, path);
+    void addTransferField(ValueVector vvIn, String inputColumnName, String 
outputColumnName) {
+        addField(vvIn, null, OutputColumnType.TRANSFER, inputColumnName, 
outputColumnName);
     }
 
-    void addNewField(ValueVector vv, LogicalExpression logicalExpression) {
-        addField(vv, logicalExpression, OutputColumnType.NEW, null);
+    void addNewField(ValueVector vvOut, LogicalExpression logicalExpression) {
+        addField(vvOut, logicalExpression, OutputColumnType.NEW, null, 
vvOut.getField().getName());
     }
 
-    void addField(ValueVector vv, LogicalExpression logicalExpression, 
OutputColumnType outputColumnType, String path) {
+    void addField(ValueVector vv, LogicalExpression logicalExpression, 
OutputColumnType outputColumnType,
+                  String inputColumnName, String outputColumnName) {
         if(isFixedWidth(vv)) {
             addFixedWidthField(vv);
         } else {
-            addVariableWidthField(vv, logicalExpression, outputColumnType, 
path);
+            addVariableWidthField(vv, logicalExpression, outputColumnType, 
inputColumnName, outputColumnName);
         }
     }
 
     private void addVariableWidthField(ValueVector vv, LogicalExpression 
logicalExpression,
-                                       OutputColumnType outputColumnType, 
String path) {
+                                       OutputColumnType outputColumnType, 
String inputColumnName, String outputColumnName) {
         variableWidthColumnCount++;
         ColumnWidthInfo columnWidthInfo;
         //Variable width transfers
         if(outputColumnType == OutputColumnType.TRANSFER) {
-            String columnName = path;
-            VarLenReadExpr readExpr = new VarLenReadExpr(columnName);
-            columnWidthInfo = new ColumnWidthInfo(vv, readExpr, 
outputColumnType,
+            VarLenReadExpr readExpr = new VarLenReadExpr(inputColumnName);
+            columnWidthInfo = new ColumnWidthInfo(readExpr, outputColumnType,
                     WidthType.VARIABLE, -1); //fieldWidth has to be obtained 
from the RecordBatchSizer
         } else if (isComplex(vv.getField().getType())) {
             addComplexField(vv);
             return;
         } else {
             // Walk the tree of LogicalExpressions to get a tree of 
OutputWidthExpressions
-            OutputWidthVisitorState state = new OutputWidthVisitorState(this, 
outputColumnType);
+            OutputWidthVisitorState state = new OutputWidthVisitorState(this);
             OutputWidthExpression outputWidthExpression = 
logicalExpression.accept(new OutputWidthVisitor(), state);
-            columnWidthInfo = new ColumnWidthInfo(vv, outputWidthExpression, 
outputColumnType,
+            columnWidthInfo = new ColumnWidthInfo(outputWidthExpression, 
outputColumnType,
                     WidthType.VARIABLE, -1); //fieldWidth has to be obtained 
from the OutputWidthExpression
         }
-        outputColumnSizes.put(columnWidthInfo.getName(), columnWidthInfo);
+        ColumnWidthInfo existingInfo = outputColumnSizes.put(outputColumnName, 
columnWidthInfo);
+        Preconditions.checkState(existingInfo == null);
     }
 
     void addComplexField(ValueVector vv) {
@@ -258,8 +254,8 @@ public class ProjectMemoryManager extends 
RecordBatchMemoryManager {
         setRecordBatchSizer(batchSizer);
         rowWidth = 0;
         int totalVariableColumnWidth = 0;
-        for (String expr : outputColumnSizes.keySet()) {
-            ColumnWidthInfo columnWidthInfo = outputColumnSizes.get(expr);
+        for (String outputColumnName : outputColumnSizes.keySet()) {
+            ColumnWidthInfo columnWidthInfo = 
outputColumnSizes.get(outputColumnName);
             int width = -1;
             if (columnWidthInfo.isFixedWidth()) {
                 // fixed width columns are accumulated in 
totalFixedWidthColumnWidth
@@ -269,12 +265,10 @@ public class ProjectMemoryManager extends 
RecordBatchMemoryManager {
                 //As the tree is walked, the RecordBatchSizer and function 
annotations
                 //are looked-up to come up with the final FixedLenExpr
                 OutputWidthExpression savedWidthExpr = 
columnWidthInfo.getOutputExpression();
-                OutputColumnType columnType = 
columnWidthInfo.getOutputColumnType();
-                OutputWidthVisitorState state = new 
OutputWidthVisitorState(this, columnType);
+                OutputWidthVisitorState state = new 
OutputWidthVisitorState(this);
                 OutputWidthExpression reducedExpr = savedWidthExpr.accept(new 
OutputWidthVisitor(), state);
-                assert reducedExpr instanceof FixedLenExpr;
                 width = ((FixedLenExpr)reducedExpr).getWidth();
-                assert width >= 0;
+                Preconditions.checkState(width >= 0);
             }
             totalVariableColumnWidth += width;
         }
@@ -301,7 +295,7 @@ public class ProjectMemoryManager extends 
RecordBatchMemoryManager {
         logger.trace("update() : Output RC {}, BatchSizer RC {}, incoming RC 
{}, width {}, total fixed width {}"
                     + ", total variable width {}, total complex width {}, 
batchSizer time {} ms, update time {}  ms"
                     + ", manager {}, incoming {}",outPutRowCount, 
batchSizer.rowCount(), incomingBatch.getRecordCount(),
-                    totalFixedWidthColumnWidth, totalVariableColumnWidth, 
totalComplexColumnWidth,
+                    rowWidth, totalFixedWidthColumnWidth, 
totalVariableColumnWidth, totalComplexColumnWidth,
                     (batchSizerEndTime - updateStartTime),(updateEndTime - 
updateStartTime), this, incomingBatch);
 
         logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 4bc63c0..dd93325 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -113,11 +113,6 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
 
   public ProjectRecordBatch(final Project pop, final RecordBatch incoming, 
final FragmentContext context) throws OutOfMemoryException {
     super(pop, context, incoming);
-
-    // get the output batch size from config.
-    int configuredBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
-
-    memoryManager = new ProjectMemoryManager(configuredBatchSize);
   }
 
   @Override
@@ -367,6 +362,9 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
 
   private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws 
SchemaChangeException {
     long setupNewSchemaStartTime = System.currentTimeMillis();
+    // get the output batch size from config.
+    int configuredBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    memoryManager = new ProjectMemoryManager(configuredBatchSize);
     memoryManager.init(incomingBatch, this);
     if (allocationVectors != null) {
       for (final ValueVector v : allocationVectors) {
@@ -431,7 +429,7 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
               final ValueVector vvOut = 
container.addOrGet(MaterializedField.create(ref.getAsNamePart().getName(),
                 vvIn.getField().getType()), callBack);
               final TransferPair tp = vvIn.makeTransferPair(vvOut);
-              memoryManager.addTransferField(vvIn, vvIn.getField().getName());
+              memoryManager.addTransferField(vvIn, vvIn.getField().getName(), 
vvOut.getField().getName());
               transfers.add(tp);
             }
           } else if (value != null && value > 1) { // subsequent wildcards 
should do a copy of incoming valuevectors
@@ -513,7 +511,7 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
           
container.addOrGet(MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(),
             vectorRead.getMajorType()), callBack);
         final TransferPair tp = vvIn.makeTransferPair(vvOut);
-        memoryManager.addTransferField(vvIn, TypedFieldId.getPath(id, 
incomingBatch));
+        memoryManager.addTransferField(vvIn, TypedFieldId.getPath(id, 
incomingBatch), vvOut.getField().getName());
         transfers.add(tp);
         transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
       } else if (expr instanceof DrillFuncHolderExpr &&
@@ -540,13 +538,13 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
         memoryManager.addComplexField(null); // this will just add an estimate 
to the row width
       } else {
         // need to do evaluation.
-        final ValueVector vector = container.addOrGet(outputField, callBack);
-        allocationVectors.add(vector);
+        final ValueVector ouputVector = container.addOrGet(outputField, 
callBack);
+        allocationVectors.add(ouputVector);
         final TypedFieldId fid = 
container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
-        final boolean useSetSafe = !(vector instanceof FixedWidthVector);
+        final boolean useSetSafe = !(ouputVector instanceof FixedWidthVector);
         final ValueVectorWriteExpression write = new 
ValueVectorWriteExpression(fid, expr, useSetSafe);
         final HoldingContainer hc = cg.addExpr(write, 
ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
-        memoryManager.addNewField(vector, write);
+        memoryManager.addNewField(ouputVector, write);
 
         // We cannot do multiple transfers from the same vector. However we 
still need to instantiate the output vector.
         if (expr instanceof ValueVectorReadExpression) {
@@ -555,7 +553,7 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
             final TypedFieldId id = vectorRead.getFieldId();
             final ValueVector vvIn = 
incomingBatch.getValueAccessorById(id.getIntermediateClass(),
                     id.getFieldIds()).getValueVector();
-            vvIn.makeTransferPair(vector);
+            vvIn.makeTransferPair(ouputVector);
           }
         }
       }

Reply via email to