Repository: drill
Updated Branches:
  refs/heads/master e7db9dcac -> ca5a8476f


DRILL-3871: Off by one error while reading binary fields with one terminal null 
in parquet.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ca5a8476
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ca5a8476
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ca5a8476

Branch: refs/heads/master
Commit: ca5a8476fb67d1c6b51472ddd48d7d51bbb3703b
Parents: e7db9dc
Author: Parth Chandra <par...@apache.org>
Authored: Mon Oct 5 10:25:56 2015 -0700
Committer: Parth Chandra <par...@apache.org>
Committed: Mon Nov 2 14:46:05 2015 -0800

----------------------------------------------------------------------
 .../columnreaders/NullableColumnReader.java     | 203 +++++++++++--------
 .../physical/impl/writer/TestParquetWriter.java |  88 ++++++--
 .../test/resources/parquet/all_nulls.parquet    | Bin 0 -> 1258 bytes
 .../parquet/first_page_all_nulls.parquet        | Bin 0 -> 2585 bytes
 .../parquet/first_page_one_null.parquet         | Bin 0 -> 3899 bytes
 .../parquet/last_page_all_nulls.parquet         | Bin 0 -> 3796 bytes
 .../parquet/last_page_one_null.parquet          | Bin 0 -> 3899 bytes
 .../src/test/resources/parquet/no_nulls.parquet | Bin 0 -> 4706 bytes
 8 files changed, 194 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/ca5a8476/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
index d721601..9db87f4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
@@ -29,117 +29,152 @@ import parquet.format.SchemaElement;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 
 abstract class NullableColumnReader<V extends ValueVector> extends 
ColumnReader<V>{
-
-  int nullsFound;
-  // used to skip nulls found
-  int rightBitShift;
-  // used when copying less than a byte worth of data at a time, to indicate 
the number of used bits in the current byte
-  int bitsUsed;
-  BaseDataValueVector castedBaseVector;
-  NullableVectorDefinitionSetter castedVectorMutator;
-  long definitionLevelsRead;
-  long totalDefinitionLevelsRead;
+    private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(NullableColumnReader.class);
+  protected BaseDataValueVector castedBaseVector;
+  protected NullableVectorDefinitionSetter castedVectorMutator;
+  private long definitionLevelsRead = 0;
 
   NullableColumnReader(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
                boolean fixedLength, V v, SchemaElement schemaElement) throws 
ExecutionSetupException {
     super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
     castedBaseVector = (BaseDataValueVector) v;
     castedVectorMutator = (NullableVectorDefinitionSetter) v.getMutator();
-    totalDefinitionLevelsRead = 0;
   }
 
-
-  @Override
-  public void processPages(long recordsToReadInThisPass) throws IOException {
-    int indexInOutputVector = 0;
+  @Override public void processPages(long recordsToReadInThisPass)
+      throws IOException {
     readStartInBytes = 0;
     readLength = 0;
     readLengthInBits = 0;
     recordsReadInThisIteration = 0;
     vectorData = castedBaseVector.getBuffer();
 
-      // values need to be spaced out where nulls appear in the column
-      // leaving blank space for nulls allows for random access to values
-      // to optimize copying data out of the buffered disk stream, runs of 
defined values
-      // are located and copied together, rather than copying individual values
-
-      long runStart = pageReader.readPosInBytes;
-      int runLength;
-      int currentDefinitionLevel;
-      boolean lastValueWasNull;
-      boolean lastRunBrokenByNull = false;
-      while (indexInOutputVector < recordsToReadInThisPass && 
indexInOutputVector < valueVec.getValueCapacity()){
-        // read a page if needed
+    // values need to be spaced out where nulls appear in the column
+    // leaving blank space for nulls allows for random access to values
+    // to optimize copying data out of the buffered disk stream, runs of 
defined values
+    // are located and copied together, rather than copying individual values
+
+    int runLength = -1;     // number of non-null records in this pass.
+    int nullRunLength = -1; // number of consecutive null records that we read.
+    int currentDefinitionLevel = -1;
+    int readCount = 0; // the record number we last read.
+    int writeCount = 0; // the record number we last wrote to the value vector.
+                        // This was previously the indexInOutputVector variable
+    boolean haveMoreData; // true if we have more data and have not filled the 
vector
+
+    while (readCount < recordsToReadInThisPass && writeCount < 
valueVec.getValueCapacity()) {
+      // read a page if needed
       if (!pageReader.hasPage()
-            || ((readStartInBytes + readLength >= pageReader.byteLength && 
bitsUsed == 0) &&
-          definitionLevelsRead >= pageReader.currentPageCount)) {
-          if (!pageReader.next()) {
-            break;
-          }
-          definitionLevelsRead = 0;
+          || (definitionLevelsRead >= pageReader.currentPageCount)) {
+        if (!pageReader.next()) {
+          break;
         }
-        lastValueWasNull = true;
-        runLength = 0;
-        if (lastRunBrokenByNull ) {
-          nullsFound = 1;
-          lastRunBrokenByNull = false;
-        } else  {
-          nullsFound = 0;
-        }
-        // loop to find the longest run of defined values available, can be 
preceded by several nulls
-        while(indexInOutputVector < recordsToReadInThisPass
-            && indexInOutputVector < valueVec.getValueCapacity()
-          && definitionLevelsRead < pageReader.currentPageCount) {
+        //New page. Reset the definition level.
+        currentDefinitionLevel = -1;
+        definitionLevelsRead = 0;
+        recordsReadInThisIteration = 0;
+      }
+
+      nullRunLength = 0;
+      runLength = 0;
+
+      //
+      // Let's skip the next run of nulls if any ...
+      //
+
+      // If we are reentering this loop, the currentDefinitionLevel has 
already been read
+      if (currentDefinitionLevel < 0) {
+        currentDefinitionLevel = pageReader.definitionLevels.readInteger();
+      }
+      haveMoreData = readCount < recordsToReadInThisPass
+          && writeCount + nullRunLength < valueVec.getValueCapacity()
+          && definitionLevelsRead < pageReader.currentPageCount;
+      while (haveMoreData && currentDefinitionLevel < columnDescriptor
+          .getMaxDefinitionLevel()) {
+        readCount++;
+        nullRunLength++;
+        definitionLevelsRead++;
+        haveMoreData = readCount < recordsToReadInThisPass
+            && writeCount + nullRunLength < valueVec.getValueCapacity()
+            && definitionLevelsRead < pageReader.currentPageCount;
+        if (haveMoreData) {
           currentDefinitionLevel = pageReader.definitionLevels.readInteger();
-          definitionLevelsRead++;
-          indexInOutputVector++;
-          totalDefinitionLevelsRead++;
-          if ( currentDefinitionLevel < 
columnDescriptor.getMaxDefinitionLevel()){
-            // a run of non-null values was found, break out of this loop to 
do a read in the outer loop
-            if ( ! lastValueWasNull ){
-              lastRunBrokenByNull = true;
-              break;
-            }
-            nullsFound++;
-            lastValueWasNull = true;
-          }
-          else{
-            if (lastValueWasNull){
-              runLength = 0;
-              lastValueWasNull = false;
-            }
-            runLength++;
-            castedVectorMutator.setIndexDefined(indexInOutputVector - 1);
-          }
         }
-        valuesReadInCurrentPass += nullsFound;
+      }
+      //
+      // Write the nulls if any
+      //
+      if (nullRunLength > 0) {
+        int writerIndex =
+            ((BaseDataValueVector) valueVec).getBuffer().writerIndex();
+        castedBaseVector.getBuffer().setIndex(0, writerIndex + (int) Math
+            .ceil(nullRunLength * dataTypeLengthInBits / 8.0));
+        writeCount += nullRunLength;
+        valuesReadInCurrentPass += nullRunLength;
+        recordsReadInThisIteration += nullRunLength;
+      }
 
-        int writerIndex = ((BaseDataValueVector) 
valueVec).getBuffer().writerIndex();
-        if ( dataTypeLengthInBits > 8  || (dataTypeLengthInBits < 8 && 
totalValuesRead + runLength % 8 == 0)){
-          castedBaseVector.getBuffer().setIndex(0, writerIndex + (int) 
Math.ceil( nullsFound * dataTypeLengthInBits / 8.0));
-        }
-        else if (dataTypeLengthInBits < 8){
-          rightBitShift += dataTypeLengthInBits * nullsFound;
+      //
+      // Handle the run of non-null values
+      //
+      haveMoreData = readCount < recordsToReadInThisPass
+          && writeCount + runLength < valueVec.getValueCapacity()
+          // note: writeCount+runLength
+          && definitionLevelsRead < pageReader.currentPageCount;
+      while (haveMoreData && currentDefinitionLevel >= columnDescriptor
+          .getMaxDefinitionLevel()) {
+        readCount++;
+        runLength++;
+        definitionLevelsRead++;
+        castedVectorMutator.setIndexDefined(writeCount + runLength
+            - 1); //set the nullable bit to indicate a non-null value
+        haveMoreData = readCount < recordsToReadInThisPass
+            && writeCount + runLength < valueVec.getValueCapacity()
+            && definitionLevelsRead < pageReader.currentPageCount;
+        if (haveMoreData) {
+          currentDefinitionLevel = pageReader.definitionLevels.readInteger();
         }
-        this.recordsReadInThisIteration = runLength;
+      }
 
+      //
+      // Write the non-null values
+      //
+      if (runLength > 0) {
         // set up metadata
+
+        // This _must_ be set so that the call to readField works correctly 
for all datatypes
+        this.recordsReadInThisIteration += runLength;
+
         this.readStartInBytes = pageReader.readPosInBytes;
-        this.readLengthInBits = recordsReadInThisIteration * 
dataTypeLengthInBits;
+        this.readLengthInBits = runLength * dataTypeLengthInBits;
         this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
-        readField( runLength);
-        recordsReadInThisIteration += nullsFound;
-        valuesReadInCurrentPass += runLength;
-        totalValuesRead += recordsReadInThisIteration;
-        pageReader.valuesRead += recordsReadInThisIteration;
 
-        pageReader.readPosInBytes = readStartInBytes + readLength;
+        readField(runLength);
+
+        writeCount += runLength;
+        valuesReadInCurrentPass += runLength;
       }
-    valuesReadInCurrentPass = indexInOutputVector;
-    valueVec.getMutator().setValueCount(
-        valuesReadInCurrentPass);
+
+      pageReader.valuesRead += recordsReadInThisIteration;
+      pageReader.readPosInBytes = readStartInBytes + readLength;
+
+      totalValuesRead += runLength + nullRunLength;
+
+      logger.trace("" + "recordsToReadInThisPass: {} \t "
+              + "Run Length: {} \t Null Run Length: {} \t readCount: {} \t 
writeCount: {} \t "
+              + "recordsReadInThisIteration: {} \t valuesReadInCurrentPass: {} 
\t "
+              + "totalValuesRead: {} \t readStartInBytes: {} \t readLength: {} 
\t pageReader.byteLength: {} \t "
+              + "definitionLevelsRead: {} \t pageReader.currentPageCount: {}",
+          recordsToReadInThisPass, runLength, nullRunLength, readCount,
+          writeCount, recordsReadInThisIteration, valuesReadInCurrentPass,
+          totalValuesRead, readStartInBytes, readLength, pageReader.byteLength,
+          definitionLevelsRead, pageReader.currentPageCount);
+
+    }
+
+    valueVec.getMutator().setValueCount(valuesReadInCurrentPass);
   }
 
-  @Override
+    @Override
   protected abstract void readField(long recordsToRead);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ca5a8476/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index cf4a643..4069735 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -260,7 +260,8 @@ public class TestParquetWriter extends BaseTestQuery {
         "cast(salary as decimal(24,2)) as decimal24, cast(salary as 
decimal(38,2)) as decimal38";
     String validateSelection = "decimal8, decimal15, decimal24, decimal38";
     String inputTable = "cp.`employee.json`";
-    runTestAndValidate(selection, validateSelection, inputTable, 
"parquet_decimal");
+    runTestAndValidate(selection, validateSelection, inputTable,
+        "parquet_decimal");
   }
 
   @Test
@@ -327,12 +328,17 @@ public class TestParquetWriter extends BaseTestQuery {
       testBuilder()
         .ordered()
         .sqlQuery(query)
-        .optionSettingQueriesForTestQuery("alter system set 
`store.parquet.use_new_reader` = false")
+        .optionSettingQueriesForTestQuery(
+            "alter system set `store.parquet.use_new_reader` = false")
         .sqlBaselineQuery(query)
-        .optionSettingQueriesForBaseline("alter system set 
`store.parquet.use_new_reader` = true")
+        .optionSettingQueriesForBaseline(
+            "alter system set `store.parquet.use_new_reader` = true")
         .build().run();
     } finally {
-      test("alter system set `%s` = %b", 
ExecConstants.PARQUET_NEW_RECORD_READER, 
ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR.getDefault().bool_val);
+      test("alter system set `%s` = %b",
+          ExecConstants.PARQUET_NEW_RECORD_READER,
+          ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR
+              .getDefault().bool_val);
     }
   }
 
@@ -344,12 +350,17 @@ public class TestParquetWriter extends BaseTestQuery {
         .ordered()
         .highPerformanceComparison()
         .sqlQuery(query)
-        .optionSettingQueriesForTestQuery("alter system set 
`store.parquet.use_new_reader` = false")
+        .optionSettingQueriesForTestQuery(
+            "alter system set `store.parquet.use_new_reader` = false")
         .sqlBaselineQuery(query)
-        .optionSettingQueriesForBaseline("alter system set 
`store.parquet.use_new_reader` = true")
+        .optionSettingQueriesForBaseline(
+            "alter system set `store.parquet.use_new_reader` = true")
         .build().run();
     } finally {
-      test("alter system set `%s` = %b", 
ExecConstants.PARQUET_NEW_RECORD_READER, 
ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR.getDefault().bool_val);
+      test("alter system set `%s` = %b",
+          ExecConstants.PARQUET_NEW_RECORD_READER,
+          ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR
+              .getDefault().bool_val);
     }
   }
 
@@ -368,25 +379,29 @@ public class TestParquetWriter extends BaseTestQuery {
   @Ignore
   @Test
   public void testParquetRead_checkNulls_NullsFirst() throws Exception {
-    compareParquetReadersColumnar("*", 
"dfs.`/tmp/parquet_with_nulls_should_sum_100000_nulls_first.parquet`");
+    compareParquetReadersColumnar("*",
+        "dfs.`/tmp/parquet_with_nulls_should_sum_100000_nulls_first.parquet`");
   }
 
   @Ignore
   @Test
   public void testParquetRead_checkNulls() throws Exception {
-    compareParquetReadersColumnar("*", 
"dfs.`/tmp/parquet_with_nulls_should_sum_100000.parquet`");
+    compareParquetReadersColumnar("*",
+        "dfs.`/tmp/parquet_with_nulls_should_sum_100000.parquet`");
   }
 
   @Ignore
   @Test
   public void test958_sql() throws Exception {
-    compareParquetReadersHyperVector("ss_ext_sales_price", 
"dfs.`/tmp/store_sales`");
+    compareParquetReadersHyperVector("ss_ext_sales_price",
+        "dfs.`/tmp/store_sales`");
   }
 
   @Ignore
   @Test
   public void testReadSf_1_supplier() throws Exception {
-    compareParquetReadersHyperVector("*", 
"dfs.`/tmp/orders_part-m-00001.parquet`");
+    compareParquetReadersHyperVector("*",
+        "dfs.`/tmp/orders_part-m-00001.parquet`");
   }
 
   @Ignore
@@ -410,7 +425,8 @@ public class TestParquetWriter extends BaseTestQuery {
   @Test
   public void testDrill_1314_all_columns() throws Exception {
     compareParquetReadersHyperVector("*", "dfs.`/tmp/drill_1314.parquet`");
-    
compareParquetReadersColumnar("l_orderkey,l_partkey,l_suppkey,l_linenumber, 
l_quantity, l_extendedprice,l_discount,l_tax",
+    compareParquetReadersColumnar(
+        "l_orderkey,l_partkey,l_suppkey,l_linenumber, l_quantity, 
l_extendedprice,l_discount,l_tax",
         "dfs.`/tmp/drill_1314.parquet`");
   }
 
@@ -666,7 +682,7 @@ public class TestParquetWriter extends BaseTestQuery {
     File dir = new File("target/" + this.getClass());
     if ((!dir.exists() && !dir.mkdirs()) || (dir.exists() && 
!dir.isDirectory())) {
       throw new RuntimeException("can't create dir " + dir);
-    }
+  }
     File input1 = new File(dir, "1.json");
     File input2 = new File(dir, "2.json");
     try (FileWriter fw = new FileWriter(input1)) {
@@ -678,4 +694,50 @@ public class TestParquetWriter extends BaseTestQuery {
     test("select * from " + "dfs.`" + dir.getAbsolutePath() + "`");
     runTestAndValidate("*", "*", "dfs.`" + dir.getAbsolutePath() + "`", 
"schema_change_parquet");
   }
+
+
+/*
+  The following test boundary conditions for null values occurring on page 
boundaries. All files have at least one dictionary
+  encoded page for all columns
+  */
+  @Test
+  public void testAllNulls() throws Exception {
+    compareParquetReadersColumnar(
+        "c_varchar, c_integer, c_bigint, c_float, c_double, c_date, c_time, 
c_timestamp, c_boolean",
+        "cp.`parquet/all_nulls.parquet`");
+  }
+
+  @Test
+  public void testNoNulls() throws Exception {
+    compareParquetReadersColumnar(
+        "c_varchar, c_integer, c_bigint, c_float, c_double, c_date, c_time, 
c_timestamp, c_boolean",
+        "cp.`parquet/no_nulls.parquet`");
+  }
+
+  @Test
+  public void testFirstPageAllNulls() throws Exception {
+    compareParquetReadersColumnar(
+        "c_varchar, c_integer, c_bigint, c_float, c_double, c_date, c_time, 
c_timestamp, c_boolean",
+        "cp.`parquet/first_page_all_nulls.parquet`");
+  }
+  @Test
+  public void testLastPageAllNulls() throws Exception {
+    compareParquetReadersColumnar(
+        "c_varchar, c_integer, c_bigint, c_float, c_double, c_date, c_time, 
c_timestamp, c_boolean",
+        "cp.`parquet/first_page_all_nulls.parquet`");
+  }
+  @Test
+  public void testFirstPageOneNull() throws Exception {
+    compareParquetReadersColumnar(
+        "c_varchar, c_integer, c_bigint, c_float, c_double, c_date, c_time, 
c_timestamp, c_boolean",
+        "cp.`parquet/first_page_one_null.parquet`");
+  }
+  @Test
+  public void testLastPageOneNull() throws Exception {
+    compareParquetReadersColumnar(
+        "c_varchar, c_integer, c_bigint, c_float, c_double, c_date, c_time, 
c_timestamp, c_boolean",
+        "cp.`parquet/last_page_one_null.parquet`");
+  }
+
 }
+

http://git-wip-us.apache.org/repos/asf/drill/blob/ca5a8476/exec/java-exec/src/test/resources/parquet/all_nulls.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/all_nulls.parquet 
b/exec/java-exec/src/test/resources/parquet/all_nulls.parquet
new file mode 100644
index 0000000..82f5ccb
Binary files /dev/null and 
b/exec/java-exec/src/test/resources/parquet/all_nulls.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/ca5a8476/exec/java-exec/src/test/resources/parquet/first_page_all_nulls.parquet
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/resources/parquet/first_page_all_nulls.parquet 
b/exec/java-exec/src/test/resources/parquet/first_page_all_nulls.parquet
new file mode 100644
index 0000000..a5db8af
Binary files /dev/null and 
b/exec/java-exec/src/test/resources/parquet/first_page_all_nulls.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/ca5a8476/exec/java-exec/src/test/resources/parquet/first_page_one_null.parquet
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/resources/parquet/first_page_one_null.parquet 
b/exec/java-exec/src/test/resources/parquet/first_page_one_null.parquet
new file mode 100644
index 0000000..5df7c1c
Binary files /dev/null and 
b/exec/java-exec/src/test/resources/parquet/first_page_one_null.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/ca5a8476/exec/java-exec/src/test/resources/parquet/last_page_all_nulls.parquet
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/resources/parquet/last_page_all_nulls.parquet 
b/exec/java-exec/src/test/resources/parquet/last_page_all_nulls.parquet
new file mode 100644
index 0000000..375ec6c
Binary files /dev/null and 
b/exec/java-exec/src/test/resources/parquet/last_page_all_nulls.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/ca5a8476/exec/java-exec/src/test/resources/parquet/last_page_one_null.parquet
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/resources/parquet/last_page_one_null.parquet 
b/exec/java-exec/src/test/resources/parquet/last_page_one_null.parquet
new file mode 100644
index 0000000..5df7c1c
Binary files /dev/null and 
b/exec/java-exec/src/test/resources/parquet/last_page_one_null.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/ca5a8476/exec/java-exec/src/test/resources/parquet/no_nulls.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/no_nulls.parquet 
b/exec/java-exec/src/test/resources/parquet/no_nulls.parquet
new file mode 100644
index 0000000..450414a
Binary files /dev/null and 
b/exec/java-exec/src/test/resources/parquet/no_nulls.parquet differ

Reply via email to