viirya commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r656570701



##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PrimitiveIterator;
+
+/**
+ * Helper class to store intermediate state while reading a Parquet column 
chunk.
+ */
+final class ParquetReadState {
+  private static final RowRange MAX_ROW_RANGE = new RowRange(Long.MIN_VALUE, 
Long.MAX_VALUE);
+  private static final RowRange MIN_ROW_RANGE = new RowRange(Long.MAX_VALUE, 
Long.MIN_VALUE);
+
+  /** Iterator over all row ranges, only not-null if column index is present */
+  private final Iterator<RowRange> rowRanges;
+
+  /** The current row range */
+  private RowRange currentRange;
+
+  /** Maximum definition level */
+  int maxDefinitionLevel;
+
+  /** The current index overall all rows within the column chunk. This is used 
to check if the
+   * current row should be skipped by comparing the index against the row 
ranges. */
+  long rowId;
+
+  /** The offset to add the next value in the current batch */
+  int offset;
+
+  /** The remaining number of values to read in the current page */
+  int valuesToReadInPage;
+
+  /** The remaining number of values to read in the current batch */
+  int valuesToReadInBatch;
+
+  ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong 
rowIndexes) {
+    this.maxDefinitionLevel = maxDefinitionLevel;
+    this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes);
+    nextRange();
+  }
+
+  private Iterator<RowRange> constructRanges(PrimitiveIterator.OfLong 
rowIndexes) {
+    List<RowRange> rowRanges = new ArrayList<>();
+    long currentStart, previous;
+    currentStart = previous = Long.MIN_VALUE;
+
+    while (rowIndexes.hasNext()) {
+      long idx = rowIndexes.nextLong();
+      if (previous == Long.MIN_VALUE) {
+        currentStart = previous = idx;
+      } else if (previous + 1 != idx) {
+        RowRange range = new RowRange(currentStart, previous);
+        rowRanges.add(range);
+        currentStart = previous = idx;
+      } else {
+        previous = idx;
+      }
+    }
+
+    if (previous != Long.MIN_VALUE) {
+      rowRanges.add(new RowRange(currentStart, previous));
+    }
+
+    return rowRanges.iterator();
+  }
+
+  /**
+   * Called at the beginning of reading a new batch.
+   */
+  void resetForBatch(int batchSize) {
+    this.offset = 0;
+    this.valuesToReadInBatch = batchSize;
+  }
+
+  /**
+   * Called at the beginning of reading a new page.
+   */
+  void resetForPage(int totalValuesInPage, long pageFirstRowIndex) {

Review comment:
       resetForNewPage?

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PrimitiveIterator;
+
+/**
+ * Helper class to store intermediate state while reading a Parquet column 
chunk.
+ */
+final class ParquetReadState {
+  private static final RowRange MAX_ROW_RANGE = new RowRange(Long.MIN_VALUE, 
Long.MAX_VALUE);
+  private static final RowRange MIN_ROW_RANGE = new RowRange(Long.MAX_VALUE, 
Long.MIN_VALUE);
+
+  /** Iterator over all row ranges, only not-null if column index is present */
+  private final Iterator<RowRange> rowRanges;
+
+  /** The current row range */
+  private RowRange currentRange;
+
+  /** Maximum definition level */
+  int maxDefinitionLevel;
+
+  /** The current index overall all rows within the column chunk. This is used 
to check if the
+   * current row should be skipped by comparing the index against the row 
ranges. */
+  long rowId;
+
+  /** The offset to add the next value in the current batch */
+  int offset;
+
+  /** The remaining number of values to read in the current page */
+  int valuesToReadInPage;
+
+  /** The remaining number of values to read in the current batch */
+  int valuesToReadInBatch;
+
+  ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong 
rowIndexes) {
+    this.maxDefinitionLevel = maxDefinitionLevel;
+    this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes);
+    nextRange();
+  }
+
+  private Iterator<RowRange> constructRanges(PrimitiveIterator.OfLong 
rowIndexes) {
+    List<RowRange> rowRanges = new ArrayList<>();
+    long currentStart, previous;
+    currentStart = previous = Long.MIN_VALUE;
+
+    while (rowIndexes.hasNext()) {
+      long idx = rowIndexes.nextLong();
+      if (previous == Long.MIN_VALUE) {
+        currentStart = previous = idx;
+      } else if (previous + 1 != idx) {
+        RowRange range = new RowRange(currentStart, previous);
+        rowRanges.add(range);
+        currentStart = previous = idx;
+      } else {
+        previous = idx;
+      }
+    }
+
+    if (previous != Long.MIN_VALUE) {
+      rowRanges.add(new RowRange(currentStart, previous));
+    }
+
+    return rowRanges.iterator();
+  }
+
+  /**
+   * Called at the beginning of reading a new batch.
+   */
+  void resetForBatch(int batchSize) {

Review comment:
       resetForNewBatch?

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
##########
@@ -174,24 +171,29 @@ void readBatch(int total, WritableColumnVector column) 
throws IOException {
       // page.
       dictionaryIds = column.reserveDictionaryIds(total);
     }
-    while (total > 0) {
+    readState.resetForBatch(total);
+    while (readState.valuesToReadInBatch > 0) {
       // Compute the number of values we want to read in this page.
-      int leftInPage = (int) (endOfPageValueCount - valuesRead);
-      if (leftInPage == 0) {
+      if (readState.valuesToReadInPage == 0) {
         readPage();
-        leftInPage = (int) (endOfPageValueCount - valuesRead);
+        readState.resetForPage(pageValueCount, pageFirstRowIndex);
       }
-      int num = Math.min(total, leftInPage);
       PrimitiveType.PrimitiveTypeName typeName =
           descriptor.getPrimitiveType().getPrimitiveTypeName();
       if (isCurrentPageDictionaryEncoded) {
+        boolean supportLazyDecoding = readState.rowId == pageFirstRowIndex &&

Review comment:
       Why `readState.rowId == pageFirstRowIndex` is a condition for supporting 
lazy decoding?

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
##########
@@ -174,24 +171,29 @@ void readBatch(int total, WritableColumnVector column) 
throws IOException {
       // page.
       dictionaryIds = column.reserveDictionaryIds(total);
     }
-    while (total > 0) {
+    readState.resetForBatch(total);
+    while (readState.valuesToReadInBatch > 0) {
       // Compute the number of values we want to read in this page.

Review comment:
       This comment is out-of-dated as `leftInPage` is removed.

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##########
@@ -170,77 +170,135 @@ public int readInteger() {
    *  }
    */
   public void readBatch(
-      int total,
-      int offset,
+      ParquetReadState state,
       WritableColumnVector values,
-      int maxDefinitionLevel,
       VectorizedValuesReader valueReader,
       ParquetVectorUpdater updater) throws IOException {
-    int left = total;
-    while (left > 0) {
+    int offset = state.offset;
+    long rowId = state.rowId;
+
+    while (state.hasMoreInPage(offset, rowId)) {
       if (this.currentCount == 0) this.readNextGroup();
-      int n = Math.min(left, this.currentCount);
-      switch (mode) {
-        case RLE:
-          if (currentValue == maxDefinitionLevel) {
-            updater.updateBatch(n, offset, values, valueReader);
-          } else {
-            values.putNulls(offset, n);
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; ++i) {
-            if (currentBuffer[currentBufferIdx++] == maxDefinitionLevel) {
-              updater.update(offset + i, values, valueReader);
+      int n = Math.min(state.valuesToReadInBatch + state.offset - offset, 
this.currentCount);

Review comment:
       `n` is the number of values to read?
   
   Isn't it just `Math.min(state.valuesToReadInBatch - offset, 
this.currentCount)`?
   
   Why needs to add `state.offset`?
   
   
   

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
##########
@@ -95,6 +80,18 @@
    */
   private final ParquetVectorUpdaterFactory updaterFactory;
 
+  /**
+   * Helper struct to track intermediate states while reading Parquet pages in 
the column chunk.
+   */
+  private final ParquetReadState readState;
+
+  /**
+   * The index for the first row in the current page, among all rows across 
all pages in the
+   * column chunk for this reader. The value for this is 0 if there is no 
column index for the

Review comment:
       If there is column index, cannot `pageFirstRowIndex` be 0 if happened 
that it is exactly the first row to read?

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
##########
@@ -294,6 +292,7 @@ private void initDataReader(Encoding dataEncoding, 
ByteBufferInputStream in) thr
 
   private void readPageV1(DataPageV1 page) throws IOException {
     this.pageValueCount = page.getValueCount();
+    this.pageFirstRowIndex = page.getFirstRowIndex().orElse(0L);

Review comment:
       same question as above, cannot `getFirstRowIndex` return 0? Or the index 
begins with 1?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to