cloud-fan commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r659962679



##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##########
@@ -156,55 +156,81 @@ public int readInteger() {
   }
 
   /**
-   * Reads `total` ints into `c` filling them in starting at `c[rowId]`. This 
reader
-   * reads the definition levels and then will read from `data` for the 
non-null values.
-   * If the value is null, c will be populated with `nullValue`. Note that 
`nullValue` is only
-   * necessary for readIntegers because we also use it to decode dictionaryIds 
and want to make
-   * sure it always has a value in range.
-   *
-   * This is a batched version of this logic:
-   *  if (this.readInt() == level) {
-   *    c[rowId] = data.readInteger();
-   *  } else {
-   *    c[rowId] = null;
-   *  }
+   * Reads a batch of values into vector `values`, using `valueReader`. The 
related states such
+   * as row index, offset, number of values left in the batch and page, etc, 
are tracked by
+   * `state`. The type-specific `updater` is used to update or skip values.
+   * <p>
+   * This reader reads the definition levels and then will read from 
`valueReader` for the
+   * non-null values. If the value is null, `values` will be populated with 
null value.
    */
   public void readBatch(
       ParquetReadState state,
       WritableColumnVector values,
       VectorizedValuesReader valueReader,
       ParquetVectorUpdater updater) throws IOException {
     int offset = state.offset;
-    int left = Math.min(state.valuesToReadInBatch, state.valuesToReadInPage);
+    long rowId = state.rowId;
+    int leftInBatch = state.valuesToReadInBatch;
+    int leftInPage = state.valuesToReadInPage;
 
-    while (left > 0) {
+    while (leftInBatch > 0 && leftInPage > 0) {
       if (this.currentCount == 0) this.readNextGroup();
-      int n = Math.min(left, this.currentCount);
-
-      switch (mode) {
-        case RLE:
-          if (currentValue == state.maxDefinitionLevel) {
-            updater.updateBatch(n, offset, values, valueReader);
-          } else {
-            values.putNulls(offset, n);
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; ++i) {
-            if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) 
{
-              updater.update(offset + i, values, valueReader);
+      int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount));
+
+      long rangeStart = state.currentRangeStart();
+      long rangeEnd = state.currentRangeEnd();
+
+      if (rowId + n < rangeStart) {
+        updater.skipBatch(n, valueReader);
+        advance(n);
+        rowId += n;
+        leftInPage -= n;

Review comment:
       do we need to update `leftInBatch` here?

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##########
@@ -33,31 +51,104 @@
   /** The remaining number of values to read in the current batch */
   int valuesToReadInBatch;
 
-  ParquetReadState(int maxDefinitionLevel) {
+  ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong 
rowIndexes) {
     this.maxDefinitionLevel = maxDefinitionLevel;
+    this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes);
+    nextRange();
   }
 
   /**
-   * Called at the beginning of reading a new batch.
+   * Construct a list of row ranges from the given `rowIndexes`. For example, 
suppose the
+   * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 
row ranges:

Review comment:
       interesting, does the parquet read lib give you a big array containing 
these indexes, or it uses an algorithm to generate the indexes on the fly?

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##########
@@ -33,31 +51,104 @@
   /** The remaining number of values to read in the current batch */
   int valuesToReadInBatch;
 
-  ParquetReadState(int maxDefinitionLevel) {
+  ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong 
rowIndexes) {
     this.maxDefinitionLevel = maxDefinitionLevel;
+    this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes);
+    nextRange();
   }
 
   /**
-   * Called at the beginning of reading a new batch.
+   * Construct a list of row ranges from the given `rowIndexes`. For example, 
suppose the
+   * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 
row ranges:

Review comment:
       interesting, does the parquet reader lib give you a big array containing 
these indexes, or it uses an algorithm to generate the indexes on the fly?

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##########
@@ -33,31 +51,104 @@
   /** The remaining number of values to read in the current batch */
   int valuesToReadInBatch;
 
-  ParquetReadState(int maxDefinitionLevel) {
+  ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong 
rowIndexes) {
     this.maxDefinitionLevel = maxDefinitionLevel;
+    this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes);
+    nextRange();
   }
 
   /**
-   * Called at the beginning of reading a new batch.
+   * Construct a list of row ranges from the given `rowIndexes`. For example, 
suppose the
+   * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 
row ranges:

Review comment:
       And how fast/slow the parquet reader lib can generate the indexes?

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##########
@@ -156,55 +156,81 @@ public int readInteger() {
   }
 
   /**
-   * Reads `total` ints into `c` filling them in starting at `c[rowId]`. This 
reader
-   * reads the definition levels and then will read from `data` for the 
non-null values.
-   * If the value is null, c will be populated with `nullValue`. Note that 
`nullValue` is only
-   * necessary for readIntegers because we also use it to decode dictionaryIds 
and want to make
-   * sure it always has a value in range.
-   *
-   * This is a batched version of this logic:
-   *  if (this.readInt() == level) {
-   *    c[rowId] = data.readInteger();
-   *  } else {
-   *    c[rowId] = null;
-   *  }
+   * Reads a batch of values into vector `values`, using `valueReader`. The 
related states such
+   * as row index, offset, number of values left in the batch and page, etc, 
are tracked by
+   * `state`. The type-specific `updater` is used to update or skip values.
+   * <p>
+   * This reader reads the definition levels and then will read from 
`valueReader` for the
+   * non-null values. If the value is null, `values` will be populated with 
null value.
    */
   public void readBatch(
       ParquetReadState state,
       WritableColumnVector values,
       VectorizedValuesReader valueReader,
       ParquetVectorUpdater updater) throws IOException {
     int offset = state.offset;
-    int left = Math.min(state.valuesToReadInBatch, state.valuesToReadInPage);
+    long rowId = state.rowId;
+    int leftInBatch = state.valuesToReadInBatch;
+    int leftInPage = state.valuesToReadInPage;
 
-    while (left > 0) {
+    while (leftInBatch > 0 && leftInPage > 0) {
       if (this.currentCount == 0) this.readNextGroup();
-      int n = Math.min(left, this.currentCount);
-
-      switch (mode) {
-        case RLE:
-          if (currentValue == state.maxDefinitionLevel) {
-            updater.updateBatch(n, offset, values, valueReader);
-          } else {
-            values.putNulls(offset, n);
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; ++i) {
-            if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) 
{
-              updater.update(offset + i, values, valueReader);
+      int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount));
+
+      long rangeStart = state.currentRangeStart();
+      long rangeEnd = state.currentRangeEnd();
+
+      if (rowId + n < rangeStart) {
+        updater.skipBatch(n, valueReader);
+        advance(n);
+        rowId += n;
+        leftInPage -= n;

Review comment:
       I see, so there are 2 actions here:
   1. get values from the parquet reader
   2. put values into the spark columnar batch.
   
   `skipBatch` does look confusing, we should use a better name, `skipValues`?

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##########
@@ -156,55 +156,81 @@ public int readInteger() {
   }
 
   /**
-   * Reads `total` ints into `c` filling them in starting at `c[rowId]`. This 
reader
-   * reads the definition levels and then will read from `data` for the 
non-null values.
-   * If the value is null, c will be populated with `nullValue`. Note that 
`nullValue` is only
-   * necessary for readIntegers because we also use it to decode dictionaryIds 
and want to make
-   * sure it always has a value in range.
-   *
-   * This is a batched version of this logic:
-   *  if (this.readInt() == level) {
-   *    c[rowId] = data.readInteger();
-   *  } else {
-   *    c[rowId] = null;
-   *  }
+   * Reads a batch of values into vector `values`, using `valueReader`. The 
related states such
+   * as row index, offset, number of values left in the batch and page, etc, 
are tracked by
+   * `state`. The type-specific `updater` is used to update or skip values.
+   * <p>
+   * This reader reads the definition levels and then will read from 
`valueReader` for the
+   * non-null values. If the value is null, `values` will be populated with 
null value.
    */
   public void readBatch(
       ParquetReadState state,
       WritableColumnVector values,
       VectorizedValuesReader valueReader,
       ParquetVectorUpdater updater) throws IOException {
     int offset = state.offset;
-    int left = Math.min(state.valuesToReadInBatch, state.valuesToReadInPage);
+    long rowId = state.rowId;
+    int leftInBatch = state.valuesToReadInBatch;
+    int leftInPage = state.valuesToReadInPage;
 
-    while (left > 0) {
+    while (leftInBatch > 0 && leftInPage > 0) {
       if (this.currentCount == 0) this.readNextGroup();
-      int n = Math.min(left, this.currentCount);
-
-      switch (mode) {
-        case RLE:
-          if (currentValue == state.maxDefinitionLevel) {
-            updater.updateBatch(n, offset, values, valueReader);
-          } else {
-            values.putNulls(offset, n);
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; ++i) {
-            if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) 
{
-              updater.update(offset + i, values, valueReader);
+      int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount));
+
+      long rangeStart = state.currentRangeStart();
+      long rangeEnd = state.currentRangeEnd();
+
+      if (rowId + n < rangeStart) {
+        updater.skipBatch(n, valueReader);
+        advance(n);
+        rowId += n;
+        leftInPage -= n;

Review comment:
       SGTM, I think `readValues` implies that it puts the values to the spark 
columnar batch




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to