Repository: drill Updated Branches: refs/heads/master 1a8430eac -> 3d92d2829
DRILL-4679: When convert() functions are present, ensure that ProjectRecordBatch produces a schema even for empty result set. Add unit tests Modify doAlloc() to accept record count parameter (addresses review comment) Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/bd6079cb Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/bd6079cb Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/bd6079cb Branch: refs/heads/master Commit: bd6079cb42f68d03922004abdb5f9182b8cb8caf Parents: 1a8430e Author: Aman Sinha <asi...@maprtech.com> Authored: Tue May 17 14:35:06 2016 -0700 Committer: Aman Sinha <asi...@maprtech.com> Committed: Fri May 20 13:52:04 2016 -0700 ---------------------------------------------------------------------- .../expr/fn/DrillComplexWriterFuncHolder.java | 4 ++ .../impl/project/ProjectRecordBatch.java | 40 ++++++++++-- .../physical/impl/TestConvertFunctions.java | 69 ++++++++++++++++++++ 3 files changed, 109 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/bd6079cb/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java index 747a08b..a0bf134 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java @@ -44,6 +44,10 @@ public class DrillComplexWriterFuncHolder extends DrillSimpleFuncHolder{ this.ref = ref; } + public FieldReference getReference() { + return ref; + } + @Override protected HoldingContainer generateEvalBody(ClassGenerator<?> g, HoldingContainer[] inputVariables, String body, JVar[] workspaceJVars) { http://git-wip-us.apache.org/repos/asf/drill/blob/bd6079cb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 5ba7b5a..4ad5b8b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -63,6 +63,7 @@ import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.MapVector; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import com.carrotsearch.hppc.IntHashSet; @@ -76,12 +77,14 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { private Projector projector; private List<ValueVector> allocationVectors; private List<ComplexWriter> complexWriters; + private List<DrillComplexWriterFuncHolder> complexExprList; private boolean hasRemainder = false; private int remainderIndex = 0; private int recordCount; private static final String EMPTY_STRING = ""; private boolean first = true; + private boolean wasNone = false; // whether a NONE iter outcome was already seen private class ClassifierResult { public boolean isStar = false; @@ -121,6 +124,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { @Override public IterOutcome innerNext() { + if (wasNone) { + return IterOutcome.NONE; + } recordCount = 0; if (hasRemainder) { handleRemainder(); @@ -136,6 +142,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { @Override protected IterOutcome doWork() { + if (wasNone) { + return IterOutcome.NONE; + } + int incomingRecordCount = incoming.getRecordCount(); if (first && incomingRecordCount == 0) { @@ -146,6 +156,23 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { if (next == IterOutcome.OUT_OF_MEMORY) { outOfMemory = true; return next; + } else if (next == IterOutcome.NONE) { + // since this is first batch and we already got a NONE, need to set up the schema + if (!doAlloc(0)) { + outOfMemory = true; + return IterOutcome.OUT_OF_MEMORY; + } + setValueCount(0); + + // Only need to add the schema for the complex exprs because others should already have + // been setup during setupNewSchema + for (DrillComplexWriterFuncHolder f : complexExprList) { + container.addOrGet(f.getReference().getRootSegment().getPath(), + Types.required(MinorType.MAP), MapVector.class); + } + container.buildSchema(SelectionVectorMode.NONE); + wasNone = true; + return IterOutcome.OK_NEW_SCHEMA; } else if (next != IterOutcome.OK && next != IterOutcome.OK_NEW_SCHEMA) { return next; } @@ -164,7 +191,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { container.zeroVectors(); - if (!doAlloc()) { + if (!doAlloc(incomingRecordCount)) { outOfMemory = true; return IterOutcome.OUT_OF_MEMORY; } @@ -193,7 +220,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { private void handleRemainder() { final int remainingRecordCount = incoming.getRecordCount() - remainderIndex; - if (!doAlloc()) { + if (!doAlloc(remainingRecordCount)) { outOfMemory = true; return; } @@ -222,10 +249,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { complexWriters.add(writer); } - private boolean doAlloc() { + private boolean doAlloc(int recordCount) { //Allocate vv in the allocationVectors. for (final ValueVector v : this.allocationVectors) { - AllocationHelper.allocateNew(v, incoming.getRecordCount()); + AllocationHelper.allocateNew(v, recordCount); } //Allocate vv for complexWriters. @@ -417,6 +444,11 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer. ((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef()); cg.addExpr(expr, false); + if (complexExprList == null) { + complexExprList = Lists.newArrayList(); + } + // save the expr for later for getting schema when input is empty + complexExprList.add((DrillComplexWriterFuncHolder)((DrillFuncHolderExpr)expr).getHolder()); } else { // need to do evaluation. final ValueVector vector = container.addOrGet(outputField, callBack); http://git-wip-us.apache.org/repos/asf/drill/blob/bd6079cb/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java index aab087d..8bf65d7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java @@ -148,6 +148,75 @@ public class TestConvertFunctions extends BaseTestQuery { .go(); } + @Test // DRILL-4679 + public void testConvertFromJson_drill4679() throws Exception { + Object mapVal1 = mapOf("y", "kevin", "z", "paul"); + Object mapVal2 = mapOf("y", "bill", "z", "peter"); + + // right side of union-all produces 0 rows due to FALSE filter, column t.x is a map + String query1 = String.format("select 'abc' as col1, convert_from(convert_to(t.x, 'JSON'), 'JSON') as col2, 'xyz' as col3 from cp.`/store/json/input2.json` t " + + " where t.`integer` = 2010 " + + " union all " + + " select 'abc' as col1, convert_from(convert_to(t.x, 'JSON'), 'JSON') as col2, 'xyz' as col3 from cp.`/store/json/input2.json` t" + + " where 1 = 0"); + + testBuilder() + .sqlQuery(query1) + .unOrdered() + .baselineColumns("col1", "col2", "col3") + .baselineValues("abc", mapVal1, "xyz") + .go(); + + // left side of union-all produces 0 rows due to FALSE filter, column t.x is a map + String query2 = String.format("select 'abc' as col1, convert_from(convert_to(t.x, 'JSON'), 'JSON') as col2, 'xyz' as col3 from cp.`/store/json/input2.json` t " + + " where 1 = 0 " + + " union all " + + " select 'abc' as col1, convert_from(convert_to(t.x, 'JSON'), 'JSON') as col2, 'xyz' as col3 from cp.`/store/json/input2.json` t " + + " where t.`integer` = 2010"); + + testBuilder() + .sqlQuery(query2) + .unOrdered() + .baselineColumns("col1", "col2", "col3") + .baselineValues("abc", mapVal1, "xyz") + .go(); + + // sanity test where neither side produces 0 rows + String query3 = String.format("select 'abc' as col1, convert_from(convert_to(t.x, 'JSON'), 'JSON') as col2, 'xyz' as col3 from cp.`/store/json/input2.json` t " + + " where t.`integer` = 2010 " + + " union all " + + " select 'abc' as col1, convert_from(convert_to(t.x, 'JSON'), 'JSON') as col2, 'xyz' as col3 from cp.`/store/json/input2.json` t " + + " where t.`integer` = 2001"); + + testBuilder() + .sqlQuery(query3) + .unOrdered() + .baselineColumns("col1", "col2", "col3") + .baselineValues("abc", mapVal1, "xyz") + .baselineValues("abc", mapVal2, "xyz") + .go(); + + // convert_from() on a list, column t.rl is a repeated list + Object listVal1 = listOf(listOf(2l, 1l), listOf(4l, 6l)); + Object listVal2 = listOf(); // empty + + String query4 = String.format("select 'abc' as col1, convert_from(convert_to(t.rl, 'JSON'), 'JSON') as col2, 'xyz' as col3 from cp.`/store/json/input2.json` t " + + " union all " + + " select 'abc' as col1, convert_from(convert_to(t.rl, 'JSON'), 'JSON') as col2, 'xyz' as col3 from cp.`/store/json/input2.json` t" + + " where 1 = 0"); + + testBuilder() + .sqlQuery(query4) + .unOrdered() + .baselineColumns("col1", "col2", "col3") + .baselineValues("abc", listVal1, "xyz") + .baselineValues("abc", listVal2, "xyz") + .baselineValues("abc", listVal1, "xyz") + .baselineValues("abc", listVal1, "xyz") + .go(); + + } + @Test public void testConvertToComplexJSON() throws Exception {