sadikovi commented on a change in pull request #34659:
URL: https://github.com/apache/spark/pull/34659#discussion_r824584947



##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##########
@@ -210,48 +213,358 @@ private void readBatchInternal(
       } else if (rowId > rangeEnd) {
         state.nextRange();
       } else {
-        // the range [rowId, rowId + n) overlaps with the current row range in 
state
+        // The range [rowId, rowId + n) overlaps with the current row range in 
state
         long start = Math.max(rangeStart, rowId);
         long end = Math.min(rangeEnd, rowId + n - 1);
 
-        // skip the part [rowId, start)
+        // Skip the part [rowId, start)
         int toSkip = (int) (start - rowId);
         if (toSkip > 0) {
           skipValues(toSkip, state, valueReader, updater);
           rowId += toSkip;
           leftInPage -= toSkip;
         }
 
-        // read the part [start, end]
+        // Read the part [start, end]
         n = (int) (end - start + 1);
 
         switch (mode) {
           case RLE:
             if (currentValue == state.maxDefinitionLevel) {
-              updater.readValues(n, offset, values, valueReader);
-            } else {
-              nulls.putNulls(offset, n);
+              updater.readValues(n, state.valueOffset, values, valueReader);
+              state.valueOffset += n;
+            } else if (!state.isRequired && currentValue == 
state.maxDefinitionLevel - 1) {
+              // Only add null if this represents a null element, but not for 
the case where a
+              // struct itself is null
+              nulls.putNulls(state.valueOffset, n);
+              state.valueOffset += n;
             }
+            defLevels.putInts(state.levelOffset, n, currentValue);
             break;
           case PACKED:
             for (int i = 0; i < n; ++i) {
-              if (currentBuffer[currentBufferIdx++] == 
state.maxDefinitionLevel) {
-                updater.readValue(offset + i, values, valueReader);
+              int value = currentBuffer[currentBufferIdx++];
+              if (value == state.maxDefinitionLevel) {
+                updater.readValue(state.valueOffset++, values, valueReader);
               } else {
-                nulls.putNull(offset + i);
+                // Only add null if this represents a null element, but not 
for the case where a
+                // struct itself is null
+                nulls.putNull(state.valueOffset++);
               }
+              defLevels.putInt(state.levelOffset + i, value);
             }
             break;
         }
-        offset += n;
+        state.levelOffset += n;
         leftInBatch -= n;
         rowId += n;
         leftInPage -= n;
         currentCount -= n;
+        defLevels.addElementsAppended(n);
+      }
+    }
+
+    state.rowsToReadInBatch = leftInBatch;
+    state.valuesToReadInPage = leftInPage;
+    state.rowId = rowId;
+  }
+
+  public void readBatchNested(
+      ParquetReadState state,
+      WritableColumnVector repLevels,
+      VectorizedRleValuesReader defLevelsReader,
+      WritableColumnVector defLevels,
+      WritableColumnVector values,
+      VectorizedValuesReader valueReader,
+      ParquetVectorUpdater updater) {
+    readBatchNestedInternal(state, repLevels, defLevelsReader, defLevels, 
values, values, true,
+      valueReader, updater);
+  }
+
+  public void readIntegersNested(
+      ParquetReadState state,
+      WritableColumnVector repLevels,
+      VectorizedRleValuesReader defLevelsReader,
+      WritableColumnVector defLevels,
+      WritableColumnVector values,
+      WritableColumnVector nulls,
+      VectorizedValuesReader valueReader) {
+    readBatchNestedInternal(state, repLevels, defLevelsReader, defLevels, 
values, nulls, false,
+      valueReader, new ParquetVectorUpdaterFactory.IntegerUpdater());
+  }
+
+  /**
+   * Keep reading repetition level values from the page until either: 1) we've 
read enough
+   * top-level rows to fill the current batch, or 2) we've drained the data 
page completely.
+   *
+   * @param valuesReused whether 'values' vector is reused for 'nulls'
+   */
+  public void readBatchNestedInternal(
+      ParquetReadState state,
+      WritableColumnVector repLevels,
+      VectorizedRleValuesReader defLevelsReader,
+      WritableColumnVector defLevels,
+      WritableColumnVector values,
+      WritableColumnVector nulls,
+      boolean valuesReused,
+      VectorizedValuesReader valueReader,
+      ParquetVectorUpdater updater) {
+
+    int leftInBatch = state.rowsToReadInBatch;
+    int leftInPage = state.valuesToReadInPage;
+    long rowId = state.rowId;
+
+    DefLevelProcessor defLevelProcessor = new 
DefLevelProcessor(defLevelsReader, state, defLevels,
+      values, nulls, valuesReused, valueReader, updater);
+
+    while ((leftInBatch > 0 || !state.lastListCompleted) && leftInPage > 0) {
+      if (currentCount == 0 && !readNextGroup()) break;
+
+      // Values to read in the current RLE/PACKED block, must be <= what's 
left in the page
+      int valuesLeftInBlock = Math.min(leftInPage, currentCount);
+
+      // The current row range start and end
+      long rangeStart = state.currentRangeStart();
+      long rangeEnd = state.currentRangeEnd();
+
+      switch (mode) {
+        case RLE:
+          // This RLE block is consist of top-level rows, so we'll need to 
check
+          // if the rows should be skipped according to row indexes.
+          if (currentValue == 0) {
+            if (leftInBatch == 0) {
+              state.lastListCompleted = true;
+            } else {
+              // # of rows to read in the block, must be <= what's left in the 
current batch
+              int n = Math.min(leftInBatch, valuesLeftInBlock);
+
+              if (rowId + n < rangeStart) {
+                // Need to skip all rows in [rowId, rowId + n)
+                defLevelProcessor.skipValues(n);
+                rowId += n;
+                currentCount -= n;
+                leftInPage -= n;
+              } else if (rowId > rangeEnd) {
+                // The current row index already beyond the current range: 
move to the next range
+                // and repeat
+                state.nextRange();
+              } else {
+                // The range [rowId, rowId + n) overlaps with the current row 
range
+                long start = Math.max(rangeStart, rowId);
+                long end = Math.min(rangeEnd, rowId + n - 1);
+
+                // Skip the rows in [rowId, start)
+                int toSkip = (int) (start - rowId);
+                if (toSkip > 0) {
+                  defLevelProcessor.skipValues(toSkip);
+                  rowId += toSkip;
+                  currentCount -= toSkip;
+                  leftInPage -= toSkip;
+                }
+
+                // Read the rows in [start, end]
+                n = (int) (end - start + 1);
+
+                if (n > 0) {
+                  repLevels.appendInts(n, 0);
+                  defLevelProcessor.readValues(n);
+                }
+
+                rowId += n;
+                currentCount -= n;
+                leftInBatch -= n;
+                leftInPage -= n;
+              }
+            }
+          } else {
+            // Not a top-level row: just read all the repetition levels in the 
block if the row
+            // should be included according to row indexes, else skip the rows.
+            if (!state.shouldSkip) {
+              repLevels.appendInts(valuesLeftInBlock, currentValue);
+            }
+            state.numBatchedDefLevels += valuesLeftInBlock;
+            leftInPage -= valuesLeftInBlock;
+            currentCount -= valuesLeftInBlock;
+          }
+          break;
+        case PACKED:
+          int i = 0;
+
+          for (; i < valuesLeftInBlock; i++) {
+            int currentValue = currentBuffer[currentBufferIdx + i];
+            if (currentValue == 0) {
+              if (leftInBatch == 0) {
+                state.lastListCompleted = true;
+                break;
+              } else if (rowId < rangeStart) {
+                // This is a top-level row, therefore check if we should skip 
it with row indexes
+                // the row is before the current range, skip it
+                defLevelProcessor.skipValues(1);
+              } else if (rowId > rangeEnd) {
+                // The row is after the current range, move to the next range 
and compare again
+                state.nextRange();
+                break;
+              } else {
+                // The row is in the current range, decrement the row counter 
and read it
+                leftInBatch--;
+                repLevels.appendInt(0);
+                defLevelProcessor.readValues(1);
+              }
+              rowId++;
+            } else {
+              if (!state.shouldSkip) {
+                repLevels.appendInt(currentValue);
+              }
+              state.numBatchedDefLevels += 1;
+            }
+          }
+
+          leftInPage -= i;
+          currentCount -= i;
+          currentBufferIdx += i;
+          break;
+      }
+    }
+
+    // Process all the batched def levels
+    defLevelProcessor.finish();
+
+    state.rowsToReadInBatch = leftInBatch;
+    state.valuesToReadInPage = leftInPage;
+    state.rowId = rowId;
+  }
+
+  private static class DefLevelProcessor {
+    private final VectorizedRleValuesReader reader;
+    private final ParquetReadState state;
+    private final WritableColumnVector defLevels;
+    private final WritableColumnVector values;
+    private final WritableColumnVector nulls;
+    private final boolean valuesReused;
+    private final VectorizedValuesReader valueReader;
+    private final ParquetVectorUpdater updater;
+
+    DefLevelProcessor(
+        VectorizedRleValuesReader reader,
+        ParquetReadState state,
+        WritableColumnVector defLevels,
+        WritableColumnVector values,
+        WritableColumnVector nulls,
+        boolean valuesReused,
+        VectorizedValuesReader valueReader,
+        ParquetVectorUpdater updater) {
+      this.reader = reader;
+      this.state = state;
+      this.defLevels = defLevels;
+      this.values = values;
+      this.nulls = nulls;
+      this.valuesReused = valuesReused;
+      this.valueReader = valueReader;
+      this.updater = updater;
+    }
+
+    void readValues(int n) {
+      if (!state.shouldSkip) {
+        state.numBatchedDefLevels += n;

Review comment:
       Apologies for a potentially dumb question, but why don't we call 
`reader.readValues(...)` here?

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
##########
@@ -39,12 +42,13 @@
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
+import scala.collection.JavaConverters;

Review comment:
       That should probably be moved after java imports 🙂. I am probably 
nit-picking here but maybe it would be a good nit to fix - it is up to you.

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
##########
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.MapType;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Contains necessary information representing a Parquet column, either of 
primitive or nested type.
+ */
+final class ParquetColumnVector {
+  private final ParquetColumn column;
+  private final List<ParquetColumnVector> children;
+  private final WritableColumnVector vector;
+
+  /**
+   * Repetition & Definition levels
+   * These are allocated only for leaf columns; for non-leaf columns, they 
simply maintain
+   * references to that of the former.
+   */
+  private WritableColumnVector repetitionLevels;
+  private WritableColumnVector definitionLevels;
+
+  /** Whether this column is primitive (i.e., leaf column) */
+  private final boolean isPrimitive;
+
+  /** Reader for this column - only set if 'isPrimitive' is true */
+  private VectorizedColumnReader columnReader;
+
+  ParquetColumnVector(
+      ParquetColumn column,
+      WritableColumnVector vector,
+      int capacity,
+      MemoryMode memoryMode,
+      Set<ParquetColumn> missingColumns) {
+
+    DataType sparkType = column.sparkType();
+    if (!sparkType.sameType(vector.dataType())) {
+      throw new IllegalArgumentException("Spark type: " + sparkType +
+        " doesn't match the type: " + vector.dataType() + " in column vector");
+    }
+
+    this.column = column;
+    this.vector = vector;
+    this.children = new ArrayList<>();
+    this.isPrimitive = column.isPrimitive();
+
+    if (missingColumns.contains(column)) {

Review comment:
       I would like to confirm that this check is by reference, e.g. if there 
are two instances of ParquetColumn with the same name and only one is in the 
set, the other one would return `false` when checking. If that is expected, it 
is all good.

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
##########
@@ -763,6 +805,10 @@ protected boolean isArray() {
       DecimalType.isByteArrayDecimalType(type);
   }
 
+  protected boolean isStruct() {
+    return type instanceof StructType || type instanceof CalendarIntervalType;

Review comment:
       Do you know if we have tests for this? I am curious what happens if 
someone adds a union type or another composite type, will we need to update 
this method?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -187,6 +188,30 @@ object ParquetUtils {
     }
   }
 
+  /**
+   * Whether columnar read is supported for the input `schema`.
+   */
+  def isBatchReadSupportedForSchema(sqlConf: SQLConf, schema: StructType): 
Boolean =
+    sqlConf.parquetVectorizedReaderEnabled &&
+      schema.forall(f => isBatchReadSupported(sqlConf, f.dataType))
+
+  def isBatchReadSupported(sqlConf: SQLConf, dt: DataType): Boolean = dt match 
{
+    case _: AtomicType =>
+      true
+    case at: ArrayType =>
+      sqlConf.parquetVectorizedReaderNestedColumnEnabled &&
+          isBatchReadSupported(sqlConf, at.elementType)
+    case mt: MapType =>
+      sqlConf.parquetVectorizedReaderNestedColumnEnabled &&
+          isBatchReadSupported(sqlConf, mt.keyType) &&
+          isBatchReadSupported(sqlConf, mt.valueType)
+    case st: StructType =>

Review comment:
       Should we support CalendarIntervalType here as well? I saw a method 
checking on that type also.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to