DRILL-5590: Bugs in CSV field matching, null columns Please see the problem and solution descriptions in DRILL-5590.
Also cleaned up some dead code left over from DRILL-5498. close #855 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/dd55b5c4 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/dd55b5c4 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/dd55b5c4 Branch: refs/heads/master Commit: dd55b5c49c8e3207400b99ea616a032bc6172988 Parents: 33682be Author: Paul Rogers <prog...@maprtech.com> Authored: Thu Jun 15 22:46:56 2017 -0700 Committer: Aman Sinha <asi...@maprtech.com> Committed: Sat Jun 24 09:23:22 2017 -0700 ---------------------------------------------------------------------- .../compliant/CompliantTextRecordReader.java | 73 ++------------------ .../easy/text/compliant/FieldVarCharOutput.java | 73 +++++++++++--------- .../text/compliant/RepeatedVarCharOutput.java | 5 +- .../exec/store/easy/text/compliant/TestCsv.java | 22 ++++++ 4 files changed, 68 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/dd55b5c4/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java index 4a35c3b..7009584 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java @@ -17,15 +17,9 @@ */ package org.apache.drill.exec.store.easy.text.compliant; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; -import com.univocity.parsers.common.TextParsingException; -import io.netty.buffer.DrillBuf; - import java.io.IOException; import java.io.InputStream; import java.util.List; -import java.util.Map; import javax.annotation.Nullable; @@ -36,16 +30,16 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; -import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.dfs.DrillFileSystem; -import org.apache.drill.exec.util.CallBack; -import org.apache.drill.exec.vector.ValueVector; import org.apache.hadoop.mapred.FileSplit; import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import org.apache.drill.exec.expr.TypeHelper; +import com.univocity.parsers.common.TextParsingException; + +import io.netty.buffer.DrillBuf; // New text reader, complies with the RFC 4180 standard for text/csv files public class CompliantTextRecordReader extends AbstractRecordReader { @@ -255,63 +249,4 @@ public class CompliantTextRecordReader extends AbstractRecordReader { logger.warn("Exception while closing stream.", e); } } - - /** - * TextRecordReader during its first phase read to extract header should pass its own - * OutputMutator to avoid reshaping query output. - * This class provides OutputMutator for header extraction. - */ - private class HeaderOutputMutator implements OutputMutator { - private final Map<String, ValueVector> fieldVectorMap = Maps.newHashMap(); - - @SuppressWarnings("resource") - @Override - public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException { - ValueVector v = fieldVectorMap.get(field); - if (v == null || v.getClass() != clazz) { - // Field does not exist add it to the map - v = TypeHelper.getNewVector(field, oContext.getAllocator()); - if (!clazz.isAssignableFrom(v.getClass())) { - throw new SchemaChangeException(String.format( - "Class %s was provided, expected %s.", clazz.getSimpleName(), v.getClass().getSimpleName())); - } - fieldVectorMap.put(field.getPath(), v); - } - return clazz.cast(v); - } - - @Override - public void allocate(int recordCount) { - //do nothing for now - } - - @Override - public boolean isNewSchema() { - return false; - } - - @Override - public DrillBuf getManagedBuffer() { - return null; - } - - @Override - public CallBack getCallBack() { - return null; - } - - /** - * Since this OutputMutator is passed by TextRecordReader to get the header out - * the mutator might not get cleaned up elsewhere. TextRecordReader will call - * this method to clear any allocations - */ - public void close() { - for (final ValueVector v : fieldVectorMap.values()) { - v.clear(); - } - fieldVectorMap.clear(); - } - - } - } http://git-wip-us.apache.org/repos/asf/drill/blob/dd55b5c4/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/FieldVarCharOutput.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/FieldVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/FieldVarCharOutput.java index 494c593..b8343d1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/FieldVarCharOutput.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/FieldVarCharOutput.java @@ -17,8 +17,16 @@ */ package org.apache.drill.exec.store.easy.text.compliant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.map.CaseInsensitiveMap; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.SchemaChangeException; @@ -26,12 +34,6 @@ import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.vector.VarCharVector; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Arrays; -import java.util.List; - /** * Class is responsible for generating record batches for text file inputs. We generate * a record batch with a set of varchar vectors. A varchar vector contains all the field @@ -61,8 +63,9 @@ class FieldVarCharOutput extends TextOutput { private boolean rowHasData= false; private static final int MAX_FIELD_LENGTH = 1024 * 64; private int recordCount = 0; - private int batchIndex = 0; private int maxField = 0; + private int[] nullCols; + private byte nullValue[] = new byte[0]; /** * We initialize and add the varchar vector for each incoming field in this @@ -77,6 +80,7 @@ class FieldVarCharOutput extends TextOutput { int totalFields = fieldNames.length; List<String> outputColumns = new ArrayList<>(Arrays.asList(fieldNames)); + List<Integer> nullColumns = new ArrayList<>(); if (isStarQuery) { maxField = totalFields - 1; @@ -84,11 +88,14 @@ class FieldVarCharOutput extends TextOutput { Arrays.fill(selectedFields, true); } else { List<Integer> columnIds = new ArrayList<Integer>(); - String pathStr; - int index; + Map<String, Integer> headers = CaseInsensitiveMap.newHashMap(); + for (int i = 0; i < fieldNames.length; i++) { + headers.put(fieldNames[i], i); + } for (SchemaPath path : columns) { - pathStr = path.getRootSegment().getPath(); + int index; + String pathStr = path.getRootSegment().getPath(); if (pathStr.equals(COL_NAME) && path.getRootSegment().getChild() != null) { //TODO: support both field names and columns index along with predicate pushdown throw UserException @@ -98,12 +105,15 @@ class FieldVarCharOutput extends TextOutput { .addContext("column index", path.getRootSegment().getChild()) .build(logger); } else { - index = outputColumns.indexOf(pathStr); - if (index < 0) { + Integer value = headers.get(pathStr); + if (value == null) { // found col that is not a part of fieldNames, add it // this col might be part of some another scanner index = totalFields++; outputColumns.add(pathStr); + nullColumns.add(index); + } else { + index = value; } } columnIds.add(index); @@ -128,6 +138,12 @@ class FieldVarCharOutput extends TextOutput { this.fieldBytes = new byte[MAX_FIELD_LENGTH]; + // Keep track of the null columns to be filled in. + + nullCols = new int[nullColumns.size()]; + for (int i = 0; i < nullCols.length; i++) { + nullCols[i] = nullColumns.get(i); + } } /** @@ -135,11 +151,10 @@ class FieldVarCharOutput extends TextOutput { */ @Override public void startBatch() { - this.recordCount = 0; - this.batchIndex = 0; - this.currentFieldIndex= -1; - this.collect = true; - this.fieldOpen = false; + recordCount = 0; + currentFieldIndex= -1; + collect = true; + fieldOpen = false; } @Override @@ -173,7 +188,7 @@ class FieldVarCharOutput extends TextOutput { public boolean endField() { fieldOpen = false; - if(collect) { + if (collect) { assert currentVector != null; currentVector.getMutator().setSafe(recordCount, fieldBytes, 0, currentDataPointer); } @@ -192,25 +207,20 @@ class FieldVarCharOutput extends TextOutput { @Override public void finishRecord() { - if(fieldOpen){ + if (fieldOpen){ endField(); } + // Fill in null (really empty) values. + + for (int i = 0; i < nullCols.length; i++) { + vectors[nullCols[i]].getMutator().setSafe(recordCount, nullValue, 0, 0); + } recordCount++; } - // Sets the record count in this batch within the value vector @Override - public void finishBatch() { - batchIndex++; - - for (int i = 0; i <= maxField; i++) { - if (this.vectors[i] != null) { - this.vectors[i].getMutator().setValueCount(batchIndex); - } - } - - } + public void finishBatch() { } @Override public long getRecordCount() { @@ -221,5 +231,4 @@ class FieldVarCharOutput extends TextOutput { public boolean rowHasData() { return this.rowHasData; } - - } +} http://git-wip-us.apache.org/repos/asf/drill/blob/dd55b5c4/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java index eda2feb..156d6c2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java @@ -345,9 +345,6 @@ class RepeatedVarCharOutput extends TextOutput { return out; } - // Sets the record count in this batch within the value vector @Override - public void finishBatch() { - mutator.setValueCount(batchIndex); - } + public void finishBatch() { } } http://git-wip-us.apache.org/repos/asf/drill/blob/dd55b5c4/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java index 7d38cf9..c18adc9 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java @@ -135,6 +135,28 @@ public class TestCsv extends ClusterTest { .verifyAndClear(actual); } + // Test fix for DRILL-5590 + @Test + public void testCsvHeadersCaseInsensitive() throws IOException { + String fileName = "case2.csv"; + buildFile(fileName, validHeaders); + String sql = "SELECT A, b, C FROM `dfs.data`.`" + fileName + "`"; + RowSet actual = client.queryBuilder().sql(sql).rowSet(); + + BatchSchema expectedSchema = new SchemaBuilder() + .add("A", MinorType.VARCHAR) + .add("b", MinorType.VARCHAR) + .add("C", MinorType.VARCHAR) + .build(); + assertEquals(expectedSchema, actual.batchSchema()); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .add("10", "foo", "bar") + .build(); + new RowSetComparison(expected) + .verifyAndClear(actual); + } + private String makeStatement(String fileName) { return "SELECT * FROM `dfs.data`.`" + fileName + "`"; }