Repository: drill Updated Branches: refs/heads/master e7db9dcac -> ca5a8476f
DRILL-3871: Off by one error while reading binary fields with one terminal null in parquet. Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ca5a8476 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ca5a8476 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ca5a8476 Branch: refs/heads/master Commit: ca5a8476fb67d1c6b51472ddd48d7d51bbb3703b Parents: e7db9dc Author: Parth Chandra <par...@apache.org> Authored: Mon Oct 5 10:25:56 2015 -0700 Committer: Parth Chandra <par...@apache.org> Committed: Mon Nov 2 14:46:05 2015 -0800 ---------------------------------------------------------------------- .../columnreaders/NullableColumnReader.java | 203 +++++++++++-------- .../physical/impl/writer/TestParquetWriter.java | 88 ++++++-- .../test/resources/parquet/all_nulls.parquet | Bin 0 -> 1258 bytes .../parquet/first_page_all_nulls.parquet | Bin 0 -> 2585 bytes .../parquet/first_page_one_null.parquet | Bin 0 -> 3899 bytes .../parquet/last_page_all_nulls.parquet | Bin 0 -> 3796 bytes .../parquet/last_page_one_null.parquet | Bin 0 -> 3899 bytes .../src/test/resources/parquet/no_nulls.parquet | Bin 0 -> 4706 bytes 8 files changed, 194 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/ca5a8476/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java index d721601..9db87f4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java @@ -29,117 +29,152 @@ import parquet.format.SchemaElement; import parquet.hadoop.metadata.ColumnChunkMetaData; abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<V>{ - - int nullsFound; - // used to skip nulls found - int rightBitShift; - // used when copying less than a byte worth of data at a time, to indicate the number of used bits in the current byte - int bitsUsed; - BaseDataValueVector castedBaseVector; - NullableVectorDefinitionSetter castedVectorMutator; - long definitionLevelsRead; - long totalDefinitionLevelsRead; + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NullableColumnReader.class); + protected BaseDataValueVector castedBaseVector; + protected NullableVectorDefinitionSetter castedVectorMutator; + private long definitionLevelsRead = 0; NullableColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); castedBaseVector = (BaseDataValueVector) v; castedVectorMutator = (NullableVectorDefinitionSetter) v.getMutator(); - totalDefinitionLevelsRead = 0; } - - @Override - public void processPages(long recordsToReadInThisPass) throws IOException { - int indexInOutputVector = 0; + @Override public void processPages(long recordsToReadInThisPass) + throws IOException { readStartInBytes = 0; readLength = 0; readLengthInBits = 0; recordsReadInThisIteration = 0; vectorData = castedBaseVector.getBuffer(); - // values need to be spaced out where nulls appear in the column - // leaving blank space for nulls allows for random access to values - // to optimize copying data out of the buffered disk stream, runs of defined values - // are located and copied together, rather than copying individual values - - long runStart = pageReader.readPosInBytes; - int runLength; - int currentDefinitionLevel; - boolean lastValueWasNull; - boolean lastRunBrokenByNull = false; - while (indexInOutputVector < recordsToReadInThisPass && indexInOutputVector < valueVec.getValueCapacity()){ - // read a page if needed + // values need to be spaced out where nulls appear in the column + // leaving blank space for nulls allows for random access to values + // to optimize copying data out of the buffered disk stream, runs of defined values + // are located and copied together, rather than copying individual values + + int runLength = -1; // number of non-null records in this pass. + int nullRunLength = -1; // number of consecutive null records that we read. + int currentDefinitionLevel = -1; + int readCount = 0; // the record number we last read. + int writeCount = 0; // the record number we last wrote to the value vector. + // This was previously the indexInOutputVector variable + boolean haveMoreData; // true if we have more data and have not filled the vector + + while (readCount < recordsToReadInThisPass && writeCount < valueVec.getValueCapacity()) { + // read a page if needed if (!pageReader.hasPage() - || ((readStartInBytes + readLength >= pageReader.byteLength && bitsUsed == 0) && - definitionLevelsRead >= pageReader.currentPageCount)) { - if (!pageReader.next()) { - break; - } - definitionLevelsRead = 0; + || (definitionLevelsRead >= pageReader.currentPageCount)) { + if (!pageReader.next()) { + break; } - lastValueWasNull = true; - runLength = 0; - if (lastRunBrokenByNull ) { - nullsFound = 1; - lastRunBrokenByNull = false; - } else { - nullsFound = 0; - } - // loop to find the longest run of defined values available, can be preceded by several nulls - while(indexInOutputVector < recordsToReadInThisPass - && indexInOutputVector < valueVec.getValueCapacity() - && definitionLevelsRead < pageReader.currentPageCount) { + //New page. Reset the definition level. + currentDefinitionLevel = -1; + definitionLevelsRead = 0; + recordsReadInThisIteration = 0; + } + + nullRunLength = 0; + runLength = 0; + + // + // Let's skip the next run of nulls if any ... + // + + // If we are reentering this loop, the currentDefinitionLevel has already been read + if (currentDefinitionLevel < 0) { + currentDefinitionLevel = pageReader.definitionLevels.readInteger(); + } + haveMoreData = readCount < recordsToReadInThisPass + && writeCount + nullRunLength < valueVec.getValueCapacity() + && definitionLevelsRead < pageReader.currentPageCount; + while (haveMoreData && currentDefinitionLevel < columnDescriptor + .getMaxDefinitionLevel()) { + readCount++; + nullRunLength++; + definitionLevelsRead++; + haveMoreData = readCount < recordsToReadInThisPass + && writeCount + nullRunLength < valueVec.getValueCapacity() + && definitionLevelsRead < pageReader.currentPageCount; + if (haveMoreData) { currentDefinitionLevel = pageReader.definitionLevels.readInteger(); - definitionLevelsRead++; - indexInOutputVector++; - totalDefinitionLevelsRead++; - if ( currentDefinitionLevel < columnDescriptor.getMaxDefinitionLevel()){ - // a run of non-null values was found, break out of this loop to do a read in the outer loop - if ( ! lastValueWasNull ){ - lastRunBrokenByNull = true; - break; - } - nullsFound++; - lastValueWasNull = true; - } - else{ - if (lastValueWasNull){ - runLength = 0; - lastValueWasNull = false; - } - runLength++; - castedVectorMutator.setIndexDefined(indexInOutputVector - 1); - } } - valuesReadInCurrentPass += nullsFound; + } + // + // Write the nulls if any + // + if (nullRunLength > 0) { + int writerIndex = + ((BaseDataValueVector) valueVec).getBuffer().writerIndex(); + castedBaseVector.getBuffer().setIndex(0, writerIndex + (int) Math + .ceil(nullRunLength * dataTypeLengthInBits / 8.0)); + writeCount += nullRunLength; + valuesReadInCurrentPass += nullRunLength; + recordsReadInThisIteration += nullRunLength; + } - int writerIndex = ((BaseDataValueVector) valueVec).getBuffer().writerIndex(); - if ( dataTypeLengthInBits > 8 || (dataTypeLengthInBits < 8 && totalValuesRead + runLength % 8 == 0)){ - castedBaseVector.getBuffer().setIndex(0, writerIndex + (int) Math.ceil( nullsFound * dataTypeLengthInBits / 8.0)); - } - else if (dataTypeLengthInBits < 8){ - rightBitShift += dataTypeLengthInBits * nullsFound; + // + // Handle the run of non-null values + // + haveMoreData = readCount < recordsToReadInThisPass + && writeCount + runLength < valueVec.getValueCapacity() + // note: writeCount+runLength + && definitionLevelsRead < pageReader.currentPageCount; + while (haveMoreData && currentDefinitionLevel >= columnDescriptor + .getMaxDefinitionLevel()) { + readCount++; + runLength++; + definitionLevelsRead++; + castedVectorMutator.setIndexDefined(writeCount + runLength + - 1); //set the nullable bit to indicate a non-null value + haveMoreData = readCount < recordsToReadInThisPass + && writeCount + runLength < valueVec.getValueCapacity() + && definitionLevelsRead < pageReader.currentPageCount; + if (haveMoreData) { + currentDefinitionLevel = pageReader.definitionLevels.readInteger(); } - this.recordsReadInThisIteration = runLength; + } + // + // Write the non-null values + // + if (runLength > 0) { // set up metadata + + // This _must_ be set so that the call to readField works correctly for all datatypes + this.recordsReadInThisIteration += runLength; + this.readStartInBytes = pageReader.readPosInBytes; - this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits; + this.readLengthInBits = runLength * dataTypeLengthInBits; this.readLength = (int) Math.ceil(readLengthInBits / 8.0); - readField( runLength); - recordsReadInThisIteration += nullsFound; - valuesReadInCurrentPass += runLength; - totalValuesRead += recordsReadInThisIteration; - pageReader.valuesRead += recordsReadInThisIteration; - pageReader.readPosInBytes = readStartInBytes + readLength; + readField(runLength); + + writeCount += runLength; + valuesReadInCurrentPass += runLength; } - valuesReadInCurrentPass = indexInOutputVector; - valueVec.getMutator().setValueCount( - valuesReadInCurrentPass); + + pageReader.valuesRead += recordsReadInThisIteration; + pageReader.readPosInBytes = readStartInBytes + readLength; + + totalValuesRead += runLength + nullRunLength; + + logger.trace("" + "recordsToReadInThisPass: {} \t " + + "Run Length: {} \t Null Run Length: {} \t readCount: {} \t writeCount: {} \t " + + "recordsReadInThisIteration: {} \t valuesReadInCurrentPass: {} \t " + + "totalValuesRead: {} \t readStartInBytes: {} \t readLength: {} \t pageReader.byteLength: {} \t " + + "definitionLevelsRead: {} \t pageReader.currentPageCount: {}", + recordsToReadInThisPass, runLength, nullRunLength, readCount, + writeCount, recordsReadInThisIteration, valuesReadInCurrentPass, + totalValuesRead, readStartInBytes, readLength, pageReader.byteLength, + definitionLevelsRead, pageReader.currentPageCount); + + } + + valueVec.getMutator().setValueCount(valuesReadInCurrentPass); } - @Override + @Override protected abstract void readField(long recordsToRead); } http://git-wip-us.apache.org/repos/asf/drill/blob/ca5a8476/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java index cf4a643..4069735 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java @@ -260,7 +260,8 @@ public class TestParquetWriter extends BaseTestQuery { "cast(salary as decimal(24,2)) as decimal24, cast(salary as decimal(38,2)) as decimal38"; String validateSelection = "decimal8, decimal15, decimal24, decimal38"; String inputTable = "cp.`employee.json`"; - runTestAndValidate(selection, validateSelection, inputTable, "parquet_decimal"); + runTestAndValidate(selection, validateSelection, inputTable, + "parquet_decimal"); } @Test @@ -327,12 +328,17 @@ public class TestParquetWriter extends BaseTestQuery { testBuilder() .ordered() .sqlQuery(query) - .optionSettingQueriesForTestQuery("alter system set `store.parquet.use_new_reader` = false") + .optionSettingQueriesForTestQuery( + "alter system set `store.parquet.use_new_reader` = false") .sqlBaselineQuery(query) - .optionSettingQueriesForBaseline("alter system set `store.parquet.use_new_reader` = true") + .optionSettingQueriesForBaseline( + "alter system set `store.parquet.use_new_reader` = true") .build().run(); } finally { - test("alter system set `%s` = %b", ExecConstants.PARQUET_NEW_RECORD_READER, ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR.getDefault().bool_val); + test("alter system set `%s` = %b", + ExecConstants.PARQUET_NEW_RECORD_READER, + ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR + .getDefault().bool_val); } } @@ -344,12 +350,17 @@ public class TestParquetWriter extends BaseTestQuery { .ordered() .highPerformanceComparison() .sqlQuery(query) - .optionSettingQueriesForTestQuery("alter system set `store.parquet.use_new_reader` = false") + .optionSettingQueriesForTestQuery( + "alter system set `store.parquet.use_new_reader` = false") .sqlBaselineQuery(query) - .optionSettingQueriesForBaseline("alter system set `store.parquet.use_new_reader` = true") + .optionSettingQueriesForBaseline( + "alter system set `store.parquet.use_new_reader` = true") .build().run(); } finally { - test("alter system set `%s` = %b", ExecConstants.PARQUET_NEW_RECORD_READER, ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR.getDefault().bool_val); + test("alter system set `%s` = %b", + ExecConstants.PARQUET_NEW_RECORD_READER, + ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR + .getDefault().bool_val); } } @@ -368,25 +379,29 @@ public class TestParquetWriter extends BaseTestQuery { @Ignore @Test public void testParquetRead_checkNulls_NullsFirst() throws Exception { - compareParquetReadersColumnar("*", "dfs.`/tmp/parquet_with_nulls_should_sum_100000_nulls_first.parquet`"); + compareParquetReadersColumnar("*", + "dfs.`/tmp/parquet_with_nulls_should_sum_100000_nulls_first.parquet`"); } @Ignore @Test public void testParquetRead_checkNulls() throws Exception { - compareParquetReadersColumnar("*", "dfs.`/tmp/parquet_with_nulls_should_sum_100000.parquet`"); + compareParquetReadersColumnar("*", + "dfs.`/tmp/parquet_with_nulls_should_sum_100000.parquet`"); } @Ignore @Test public void test958_sql() throws Exception { - compareParquetReadersHyperVector("ss_ext_sales_price", "dfs.`/tmp/store_sales`"); + compareParquetReadersHyperVector("ss_ext_sales_price", + "dfs.`/tmp/store_sales`"); } @Ignore @Test public void testReadSf_1_supplier() throws Exception { - compareParquetReadersHyperVector("*", "dfs.`/tmp/orders_part-m-00001.parquet`"); + compareParquetReadersHyperVector("*", + "dfs.`/tmp/orders_part-m-00001.parquet`"); } @Ignore @@ -410,7 +425,8 @@ public class TestParquetWriter extends BaseTestQuery { @Test public void testDrill_1314_all_columns() throws Exception { compareParquetReadersHyperVector("*", "dfs.`/tmp/drill_1314.parquet`"); - compareParquetReadersColumnar("l_orderkey,l_partkey,l_suppkey,l_linenumber, l_quantity, l_extendedprice,l_discount,l_tax", + compareParquetReadersColumnar( + "l_orderkey,l_partkey,l_suppkey,l_linenumber, l_quantity, l_extendedprice,l_discount,l_tax", "dfs.`/tmp/drill_1314.parquet`"); } @@ -666,7 +682,7 @@ public class TestParquetWriter extends BaseTestQuery { File dir = new File("target/" + this.getClass()); if ((!dir.exists() && !dir.mkdirs()) || (dir.exists() && !dir.isDirectory())) { throw new RuntimeException("can't create dir " + dir); - } + } File input1 = new File(dir, "1.json"); File input2 = new File(dir, "2.json"); try (FileWriter fw = new FileWriter(input1)) { @@ -678,4 +694,50 @@ public class TestParquetWriter extends BaseTestQuery { test("select * from " + "dfs.`" + dir.getAbsolutePath() + "`"); runTestAndValidate("*", "*", "dfs.`" + dir.getAbsolutePath() + "`", "schema_change_parquet"); } + + +/* + The following test boundary conditions for null values occurring on page boundaries. All files have at least one dictionary + encoded page for all columns + */ + @Test + public void testAllNulls() throws Exception { + compareParquetReadersColumnar( + "c_varchar, c_integer, c_bigint, c_float, c_double, c_date, c_time, c_timestamp, c_boolean", + "cp.`parquet/all_nulls.parquet`"); + } + + @Test + public void testNoNulls() throws Exception { + compareParquetReadersColumnar( + "c_varchar, c_integer, c_bigint, c_float, c_double, c_date, c_time, c_timestamp, c_boolean", + "cp.`parquet/no_nulls.parquet`"); + } + + @Test + public void testFirstPageAllNulls() throws Exception { + compareParquetReadersColumnar( + "c_varchar, c_integer, c_bigint, c_float, c_double, c_date, c_time, c_timestamp, c_boolean", + "cp.`parquet/first_page_all_nulls.parquet`"); + } + @Test + public void testLastPageAllNulls() throws Exception { + compareParquetReadersColumnar( + "c_varchar, c_integer, c_bigint, c_float, c_double, c_date, c_time, c_timestamp, c_boolean", + "cp.`parquet/first_page_all_nulls.parquet`"); + } + @Test + public void testFirstPageOneNull() throws Exception { + compareParquetReadersColumnar( + "c_varchar, c_integer, c_bigint, c_float, c_double, c_date, c_time, c_timestamp, c_boolean", + "cp.`parquet/first_page_one_null.parquet`"); + } + @Test + public void testLastPageOneNull() throws Exception { + compareParquetReadersColumnar( + "c_varchar, c_integer, c_bigint, c_float, c_double, c_date, c_time, c_timestamp, c_boolean", + "cp.`parquet/last_page_one_null.parquet`"); + } + } + http://git-wip-us.apache.org/repos/asf/drill/blob/ca5a8476/exec/java-exec/src/test/resources/parquet/all_nulls.parquet ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/parquet/all_nulls.parquet b/exec/java-exec/src/test/resources/parquet/all_nulls.parquet new file mode 100644 index 0000000..82f5ccb Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/all_nulls.parquet differ http://git-wip-us.apache.org/repos/asf/drill/blob/ca5a8476/exec/java-exec/src/test/resources/parquet/first_page_all_nulls.parquet ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/parquet/first_page_all_nulls.parquet b/exec/java-exec/src/test/resources/parquet/first_page_all_nulls.parquet new file mode 100644 index 0000000..a5db8af Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/first_page_all_nulls.parquet differ http://git-wip-us.apache.org/repos/asf/drill/blob/ca5a8476/exec/java-exec/src/test/resources/parquet/first_page_one_null.parquet ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/parquet/first_page_one_null.parquet b/exec/java-exec/src/test/resources/parquet/first_page_one_null.parquet new file mode 100644 index 0000000..5df7c1c Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/first_page_one_null.parquet differ http://git-wip-us.apache.org/repos/asf/drill/blob/ca5a8476/exec/java-exec/src/test/resources/parquet/last_page_all_nulls.parquet ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/parquet/last_page_all_nulls.parquet b/exec/java-exec/src/test/resources/parquet/last_page_all_nulls.parquet new file mode 100644 index 0000000..375ec6c Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/last_page_all_nulls.parquet differ http://git-wip-us.apache.org/repos/asf/drill/blob/ca5a8476/exec/java-exec/src/test/resources/parquet/last_page_one_null.parquet ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/parquet/last_page_one_null.parquet b/exec/java-exec/src/test/resources/parquet/last_page_one_null.parquet new file mode 100644 index 0000000..5df7c1c Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/last_page_one_null.parquet differ http://git-wip-us.apache.org/repos/asf/drill/blob/ca5a8476/exec/java-exec/src/test/resources/parquet/no_nulls.parquet ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/parquet/no_nulls.parquet b/exec/java-exec/src/test/resources/parquet/no_nulls.parquet new file mode 100644 index 0000000..450414a Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/no_nulls.parquet differ