sunchao commented on a change in pull request #33006:
URL: https://github.com/apache/spark/pull/33006#discussion_r655914360



##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
##########
@@ -174,24 +162,29 @@ void readBatch(int total, WritableColumnVector column) 
throws IOException {
       // page.
       dictionaryIds = column.reserveDictionaryIds(total);
     }
-    while (total > 0) {
+    readState.resetForBatch(total);
+    while (readState.valuesToReadInBatch > 0) {
       // Compute the number of values we want to read in this page.
-      int leftInPage = (int) (endOfPageValueCount - valuesRead);
-      if (leftInPage == 0) {
+      if (readState.valuesToReadInPage == 0) {
         readPage();
-        leftInPage = (int) (endOfPageValueCount - valuesRead);
+        readState.resetForPage(pageValueCount);
       }
-      int num = Math.min(total, leftInPage);
       PrimitiveType.PrimitiveTypeName typeName =
           descriptor.getPrimitiveType().getPrimitiveTypeName();
       if (isCurrentPageDictionaryEncoded) {
+        boolean supportLazyDecoding = readState.offset == 0 &&
+          isLazyDecodingSupported(typeName);
+
+        // Save starting offset in case we need to decode dictionary IDs.
+        int startOffset = readState.offset;
+
         // Read and decode dictionary ids.
-        defColumn.readIntegers(
-            num, dictionaryIds, column, rowId, maxDefLevel, 
(VectorizedValuesReader) dataColumn);
+        defColumn.readIntegers(readState, dictionaryIds, column,
+          (VectorizedValuesReader) dataColumn);
 
         // TIMESTAMP_MILLIS encoded as INT64 can't be lazily decoded as we 
need to post process
         // the values to add microseconds precision.
-        if (column.hasDictionary() || (rowId == 0 && 
isLazyDecodingSupported(typeName))) {
+        if (column.hasDictionary() || supportLazyDecoding) {

Review comment:
       Thanks. I think we can just do:
   ```java
   if (column.hasDictionary() || (startOffset == 0 && 
isLazyDecodingSupported(typeName)))
   ```
   and it should have the same effect.

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##########
@@ -199,48 +200,51 @@ public void readBatch(
           break;
       }
       offset += n;
-      left -= n;
       currentCount -= n;
     }
+
+    state.advanceOffset(offset);
   }
 
   /**
    * Decoding for dictionary ids. The IDs are populated into `values` and the 
nullability is
    * populated into `nulls`.
    */
   public void readIntegers(
-      int total,
+      ParquetReadState state,
       WritableColumnVector values,
       WritableColumnVector nulls,
-      int rowId,
-      int level,
       VectorizedValuesReader data) throws IOException {
-    int left = total;
-    while (left > 0) {
+    int offset = state.offset;
+
+    while (state.hasMoreInPage(offset)) {

Review comment:
       I don't have strong opinion on this. It's just that in the column index 
PR we'll introduce `rowId` and instead of `left` we'll  need something like 
`valuesLeft` and `rowsLeft`. In addition there'll be several branches where we 
need to update the `rowsLeft`, so it'll be something like:
   ```java
   int offset = state.offset;
   long rowId = state.rowId;
   int valuesLeft = state.valuesToReadInBatch;
   int rowsLeft = state.valuesToReadInPage;
   while (valuesLeft > 0 && rowsLeft > 0) {
      if (...) {
       rowId += n;
       rowsLeft -= n;
     } else {
       ...
       if (toSkip > 0) {
         ...
         rowId += n;
         rowsLeft -= n;
       }
       ...    
       offset += n;
       valuesLeft -= n;
       rowId += n;
       rowsLeft -= n;
     }
   }
   ```
   so slightly more complicated.
       
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to