sunchao commented on a change in pull request #34659:
URL: https://github.com/apache/spark/pull/34659#discussion_r839059318



##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##########
@@ -210,48 +213,358 @@ private void readBatchInternal(
       } else if (rowId > rangeEnd) {
         state.nextRange();
       } else {
-        // the range [rowId, rowId + n) overlaps with the current row range in 
state
+        // The range [rowId, rowId + n) overlaps with the current row range in 
state
         long start = Math.max(rangeStart, rowId);
         long end = Math.min(rangeEnd, rowId + n - 1);
 
-        // skip the part [rowId, start)
+        // Skip the part [rowId, start)
         int toSkip = (int) (start - rowId);
         if (toSkip > 0) {
           skipValues(toSkip, state, valueReader, updater);
           rowId += toSkip;
           leftInPage -= toSkip;
         }
 
-        // read the part [start, end]
+        // Read the part [start, end]
         n = (int) (end - start + 1);
 
         switch (mode) {
           case RLE:
             if (currentValue == state.maxDefinitionLevel) {
-              updater.readValues(n, offset, values, valueReader);
-            } else {
-              nulls.putNulls(offset, n);
+              updater.readValues(n, state.valueOffset, values, valueReader);
+              state.valueOffset += n;
+            } else if (!state.isRequired && currentValue == 
state.maxDefinitionLevel - 1) {
+              // Only add null if this represents a null element, but not for 
the case where a
+              // struct itself is null
+              nulls.putNulls(state.valueOffset, n);
+              state.valueOffset += n;
             }
+            defLevels.putInts(state.levelOffset, n, currentValue);
             break;
           case PACKED:
             for (int i = 0; i < n; ++i) {
-              if (currentBuffer[currentBufferIdx++] == 
state.maxDefinitionLevel) {
-                updater.readValue(offset + i, values, valueReader);
+              int value = currentBuffer[currentBufferIdx++];
+              if (value == state.maxDefinitionLevel) {
+                updater.readValue(state.valueOffset++, values, valueReader);
               } else {
-                nulls.putNull(offset + i);
+                // Only add null if this represents a null element, but not 
for the case where a
+                // struct itself is null
+                nulls.putNull(state.valueOffset++);
               }
+              defLevels.putInt(state.levelOffset + i, value);
             }
             break;
         }
-        offset += n;
+        state.levelOffset += n;
         leftInBatch -= n;
         rowId += n;
         leftInPage -= n;
         currentCount -= n;
+        defLevels.addElementsAppended(n);
+      }
+    }
+
+    state.rowsToReadInBatch = leftInBatch;
+    state.valuesToReadInPage = leftInPage;
+    state.rowId = rowId;
+  }
+
+  public void readBatchNested(
+      ParquetReadState state,
+      WritableColumnVector repLevels,
+      VectorizedRleValuesReader defLevelsReader,
+      WritableColumnVector defLevels,
+      WritableColumnVector values,
+      VectorizedValuesReader valueReader,
+      ParquetVectorUpdater updater) {
+    readBatchNestedInternal(state, repLevels, defLevelsReader, defLevels, 
values, values, true,
+      valueReader, updater);
+  }
+
+  public void readIntegersNested(
+      ParquetReadState state,
+      WritableColumnVector repLevels,
+      VectorizedRleValuesReader defLevelsReader,
+      WritableColumnVector defLevels,
+      WritableColumnVector values,
+      WritableColumnVector nulls,
+      VectorizedValuesReader valueReader) {
+    readBatchNestedInternal(state, repLevels, defLevelsReader, defLevels, 
values, nulls, false,
+      valueReader, new ParquetVectorUpdaterFactory.IntegerUpdater());
+  }
+
+  /**
+   * Keep reading repetition level values from the page until either: 1) we've 
read enough

Review comment:
       No, it reads definition levels & values too. It's just the number of 
rep_level/def_level/value to read is controlled by repetition levels.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to