This is an automated email from the ASF dual-hosted git repository. sorabh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit c64436774387e80fd9b0ff6e9cd7d42c9aa7a961 Author: karthik <kmanivan...@maprtech.com> AuthorDate: Mon Jun 4 17:00:31 2018 -0700 DRILL-6594: Data batches for Project operator are not being split properly and exceed the maximum specified This change fixes the incorrect accounting in the case where a columns is being projected more than once closes #1375 --- .../impl/project/OutputWidthExpression.java | 17 ++++---- .../physical/impl/project/OutputWidthVisitor.java | 2 +- .../impl/project/OutputWidthVisitorState.java | 7 +--- .../impl/project/ProjectMemoryManager.java | 48 ++++++++++------------ .../physical/impl/project/ProjectRecordBatch.java | 22 +++++----- 5 files changed, 42 insertions(+), 54 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java index b9240d6..84a3f46 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java @@ -95,30 +95,31 @@ public abstract class OutputWidthExpression { } /** - * VarLenReadExpr captures the name of a variable length column that is used (read) in an expression. - * The captured name will be used to lookup the average entry size for the column in the corresponding + * VarLenReadExpr captures the inputColumnName and the readExpression used to read a variable length column. + * The captured inputColumnName will be used to lookup the average entry size for the column in the corresponding. + * If inputColumnName is null then the readExpression is used to get the name of the column. * {@link org.apache.drill.exec.record.RecordBatchSizer} */ public static class VarLenReadExpr extends OutputWidthExpression { ValueVectorReadExpression readExpression; - String name; + String inputColumnName; public VarLenReadExpr(ValueVectorReadExpression readExpression) { this.readExpression = readExpression; - this.name = null; + this.inputColumnName = null; } - public VarLenReadExpr(String name) { + public VarLenReadExpr(String inputColumnName) { this.readExpression = null; - this.name = name; + this.inputColumnName = inputColumnName; } public ValueVectorReadExpression getReadExpression() { return readExpression; } - public String getName() { - return name; + public String getInputColumnName() { + return inputColumnName; } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java index cb58795..70908bf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java @@ -205,7 +205,7 @@ public class OutputWidthVisitor extends AbstractExecExprVisitor<OutputWidthExpre @Override public OutputWidthExpression visitVarLenReadExpr(VarLenReadExpr varLenReadExpr, OutputWidthVisitorState state) throws RuntimeException { - String columnName = varLenReadExpr.getName(); + String columnName = varLenReadExpr.getInputColumnName(); if (columnName == null) { TypedFieldId fieldId = varLenReadExpr.getReadExpression().getTypedFieldId(); columnName = TypedFieldId.getPath(fieldId, state.manager.getIncomingBatch()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java index c0e0cb1..e18c827 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java @@ -20,18 +20,13 @@ package org.apache.drill.exec.physical.impl.project; public class OutputWidthVisitorState { ProjectMemoryManager manager; - ProjectMemoryManager.OutputColumnType outputColumnType; - public OutputWidthVisitorState(ProjectMemoryManager manager, ProjectMemoryManager.OutputColumnType outputColumnType) { + public OutputWidthVisitorState(ProjectMemoryManager manager) { this.manager = manager; - this.outputColumnType = outputColumnType; } public ProjectMemoryManager getManager() { return manager; } - public ProjectMemoryManager.OutputColumnType getOutputColumnType() { - return outputColumnType; - } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java index f461b09..03c849c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.physical.impl.project; +import com.google.common.base.Preconditions; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.DataMode; @@ -88,15 +89,12 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager { } class ColumnWidthInfo { - //MaterializedField materializedField; OutputWidthExpression outputExpression; int width; WidthType widthType; OutputColumnType outputColumnType; - String name; - ColumnWidthInfo(ValueVector vv, - OutputWidthExpression outputWidthExpression, + ColumnWidthInfo(OutputWidthExpression outputWidthExpression, OutputColumnType outputColumnType, WidthType widthType, int fieldWidth) { @@ -104,8 +102,6 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager { this.width = fieldWidth; this.outputColumnType = outputColumnType; this.widthType = widthType; - String columnName = vv.getField().getName(); - this.name = columnName; } public OutputWidthExpression getOutputExpression() { return outputExpression; } @@ -116,7 +112,6 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager { public int getWidth() { return width; } - public String getName() { return name; } } void ShouldNotReachHere() { @@ -180,43 +175,44 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager { } - void addTransferField(ValueVector vvOut, String path) { - addField(vvOut, null, OutputColumnType.TRANSFER, path); + void addTransferField(ValueVector vvIn, String inputColumnName, String outputColumnName) { + addField(vvIn, null, OutputColumnType.TRANSFER, inputColumnName, outputColumnName); } - void addNewField(ValueVector vv, LogicalExpression logicalExpression) { - addField(vv, logicalExpression, OutputColumnType.NEW, null); + void addNewField(ValueVector vvOut, LogicalExpression logicalExpression) { + addField(vvOut, logicalExpression, OutputColumnType.NEW, null, vvOut.getField().getName()); } - void addField(ValueVector vv, LogicalExpression logicalExpression, OutputColumnType outputColumnType, String path) { + void addField(ValueVector vv, LogicalExpression logicalExpression, OutputColumnType outputColumnType, + String inputColumnName, String outputColumnName) { if(isFixedWidth(vv)) { addFixedWidthField(vv); } else { - addVariableWidthField(vv, logicalExpression, outputColumnType, path); + addVariableWidthField(vv, logicalExpression, outputColumnType, inputColumnName, outputColumnName); } } private void addVariableWidthField(ValueVector vv, LogicalExpression logicalExpression, - OutputColumnType outputColumnType, String path) { + OutputColumnType outputColumnType, String inputColumnName, String outputColumnName) { variableWidthColumnCount++; ColumnWidthInfo columnWidthInfo; //Variable width transfers if(outputColumnType == OutputColumnType.TRANSFER) { - String columnName = path; - VarLenReadExpr readExpr = new VarLenReadExpr(columnName); - columnWidthInfo = new ColumnWidthInfo(vv, readExpr, outputColumnType, + VarLenReadExpr readExpr = new VarLenReadExpr(inputColumnName); + columnWidthInfo = new ColumnWidthInfo(readExpr, outputColumnType, WidthType.VARIABLE, -1); //fieldWidth has to be obtained from the RecordBatchSizer } else if (isComplex(vv.getField().getType())) { addComplexField(vv); return; } else { // Walk the tree of LogicalExpressions to get a tree of OutputWidthExpressions - OutputWidthVisitorState state = new OutputWidthVisitorState(this, outputColumnType); + OutputWidthVisitorState state = new OutputWidthVisitorState(this); OutputWidthExpression outputWidthExpression = logicalExpression.accept(new OutputWidthVisitor(), state); - columnWidthInfo = new ColumnWidthInfo(vv, outputWidthExpression, outputColumnType, + columnWidthInfo = new ColumnWidthInfo(outputWidthExpression, outputColumnType, WidthType.VARIABLE, -1); //fieldWidth has to be obtained from the OutputWidthExpression } - outputColumnSizes.put(columnWidthInfo.getName(), columnWidthInfo); + ColumnWidthInfo existingInfo = outputColumnSizes.put(outputColumnName, columnWidthInfo); + Preconditions.checkState(existingInfo == null); } void addComplexField(ValueVector vv) { @@ -258,8 +254,8 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager { setRecordBatchSizer(batchSizer); rowWidth = 0; int totalVariableColumnWidth = 0; - for (String expr : outputColumnSizes.keySet()) { - ColumnWidthInfo columnWidthInfo = outputColumnSizes.get(expr); + for (String outputColumnName : outputColumnSizes.keySet()) { + ColumnWidthInfo columnWidthInfo = outputColumnSizes.get(outputColumnName); int width = -1; if (columnWidthInfo.isFixedWidth()) { // fixed width columns are accumulated in totalFixedWidthColumnWidth @@ -269,12 +265,10 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager { //As the tree is walked, the RecordBatchSizer and function annotations //are looked-up to come up with the final FixedLenExpr OutputWidthExpression savedWidthExpr = columnWidthInfo.getOutputExpression(); - OutputColumnType columnType = columnWidthInfo.getOutputColumnType(); - OutputWidthVisitorState state = new OutputWidthVisitorState(this, columnType); + OutputWidthVisitorState state = new OutputWidthVisitorState(this); OutputWidthExpression reducedExpr = savedWidthExpr.accept(new OutputWidthVisitor(), state); - assert reducedExpr instanceof FixedLenExpr; width = ((FixedLenExpr)reducedExpr).getWidth(); - assert width >= 0; + Preconditions.checkState(width >= 0); } totalVariableColumnWidth += width; } @@ -301,7 +295,7 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager { logger.trace("update() : Output RC {}, BatchSizer RC {}, incoming RC {}, width {}, total fixed width {}" + ", total variable width {}, total complex width {}, batchSizer time {} ms, update time {} ms" + ", manager {}, incoming {}",outPutRowCount, batchSizer.rowCount(), incomingBatch.getRecordCount(), - totalFixedWidthColumnWidth, totalVariableColumnWidth, totalComplexColumnWidth, + rowWidth, totalFixedWidthColumnWidth, totalVariableColumnWidth, totalComplexColumnWidth, (batchSizerEndTime - updateStartTime),(updateEndTime - updateStartTime), this, incomingBatch); logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 4bc63c0..dd93325 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -113,11 +113,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { public ProjectRecordBatch(final Project pop, final RecordBatch incoming, final FragmentContext context) throws OutOfMemoryException { super(pop, context, incoming); - - // get the output batch size from config. - int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); - - memoryManager = new ProjectMemoryManager(configuredBatchSize); } @Override @@ -367,6 +362,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaChangeException { long setupNewSchemaStartTime = System.currentTimeMillis(); + // get the output batch size from config. + int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); + memoryManager = new ProjectMemoryManager(configuredBatchSize); memoryManager.init(incomingBatch, this); if (allocationVectors != null) { for (final ValueVector v : allocationVectors) { @@ -431,7 +429,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref.getAsNamePart().getName(), vvIn.getField().getType()), callBack); final TransferPair tp = vvIn.makeTransferPair(vvOut); - memoryManager.addTransferField(vvIn, vvIn.getField().getName()); + memoryManager.addTransferField(vvIn, vvIn.getField().getName(), vvOut.getField().getName()); transfers.add(tp); } } else if (value != null && value > 1) { // subsequent wildcards should do a copy of incoming valuevectors @@ -513,7 +511,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { container.addOrGet(MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(), vectorRead.getMajorType()), callBack); final TransferPair tp = vvIn.makeTransferPair(vvOut); - memoryManager.addTransferField(vvIn, TypedFieldId.getPath(id, incomingBatch)); + memoryManager.addTransferField(vvIn, TypedFieldId.getPath(id, incomingBatch), vvOut.getField().getName()); transfers.add(tp); transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]); } else if (expr instanceof DrillFuncHolderExpr && @@ -540,13 +538,13 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { memoryManager.addComplexField(null); // this will just add an estimate to the row width } else { // need to do evaluation. - final ValueVector vector = container.addOrGet(outputField, callBack); - allocationVectors.add(vector); + final ValueVector ouputVector = container.addOrGet(outputField, callBack); + allocationVectors.add(ouputVector); final TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName())); - final boolean useSetSafe = !(vector instanceof FixedWidthVector); + final boolean useSetSafe = !(ouputVector instanceof FixedWidthVector); final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe); final HoldingContainer hc = cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND); - memoryManager.addNewField(vector, write); + memoryManager.addNewField(ouputVector, write); // We cannot do multiple transfers from the same vector. However we still need to instantiate the output vector. if (expr instanceof ValueVectorReadExpression) { @@ -555,7 +553,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { final TypedFieldId id = vectorRead.getFieldId(); final ValueVector vvIn = incomingBatch.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector(); - vvIn.makeTransferPair(vector); + vvIn.makeTransferPair(ouputVector); } } }