This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new deac8f9  [SPARK-34863][SQL] Support complex types for Parquet 
vectorized reader
deac8f9 is described below

commit deac8f950edb1d893fe4bf2cc7c4adbd29d1db22
Author: Chao Sun <sunc...@apple.com>
AuthorDate: Fri Apr 1 19:10:11 2022 -0700

    [SPARK-34863][SQL] Support complex types for Parquet vectorized reader
    
    ### What changes were proposed in this pull request?
    
    This PR adds support for complex types (e.g., list, map, array) for Spark's 
vectorized Parquet reader. In particular, this introduces the following changes:
    1. Added a new class `ParquetColumnVector` which encapsulates all the 
necessary information needed when reading a Parquet column, including the 
`ParquetColumn` for the Parquet column, the repetition & definition levels 
(only allocated for a leaf-node of a complex type), as well as the reader for 
the column. In addition, it also contains logic for assembling nested columnar 
batches, via interpreting Parquet repetition & definition levels.
    2. Changes are made in `VectorizedParquetRecordReader` to initialize a list 
of `ParquetColumnVector` for the columns read.
    3. `VectorizedColumnReader` now also creates a reader for repetition 
column. Depending on whether maximum repetition level is 0, the batch read is 
now split into two code paths, e.g., `readBatch` versus `readBatchNested`.
    4. Added logic to handle complex type in `VectorizedRleValuesReader`. For 
data types involving only struct or primitive types, it still goes with the old 
`readBatch` method which now also saves definition levels into a vector for 
later assembly. Otherwise, for data types involving array or map, a separate 
code path `readBatchNested` is introduced to handle repetition levels.
    This PR also introduced a new flag 
`spark.sql.parquet.enableNestedColumnVectorizedReader` which turns the feature 
on or off. By default it is on to facilitates all the Parquet related test 
coverage.
    
    ### Why are the changes needed?
    
    Whenever read schema containing complex types, at the moment Spark will 
fallback to the row-based reader in parquet-mr, which is much slower. As 
benchmark shows, by adding support into the vectorized reader, we can get ~15x 
on average speed up on reading struct fields, and ~1.5x when reading array of 
struct and map.
    
    ### Does this PR introduce _any_ user-facing change?
    
    With the PR Spark should now support reading complex types in its 
vectorized Parquet reader. A new config 
`spark.sql.parquet.enableNestedColumnVectorizedReader` is introduced to turn 
the feature on or off.
    
    ### How was this patch tested?
    
    Added new unit tests.
    
    Closes #34659 from sunchao/SPARK-34863-new.
    
    Authored-by: Chao Sun <sunc...@apple.com>
    Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  11 +
 .../datasources/parquet/ParquetColumnVector.java   | 381 +++++++++++++++++++
 .../datasources/parquet/ParquetReadState.java      |  60 ++-
 .../parquet/SpecificParquetRecordReaderBase.java   |  15 +-
 .../parquet/VectorizedColumnReader.java            |  84 +++--
 .../parquet/VectorizedParquetRecordReader.java     | 160 +++++---
 .../parquet/VectorizedRleValuesReader.java         | 413 ++++++++++++++++++---
 .../execution/vectorized/OnHeapColumnVector.java   |   2 +-
 .../execution/vectorized/WritableColumnVector.java |  48 ++-
 .../datasources/parquet/ParquetFileFormat.scala    |   8 +-
 .../parquet/ParquetSchemaConverter.scala           |  17 +-
 .../datasources/parquet/ParquetUtils.scala         |  27 +-
 .../v2/parquet/ParquetPartitionReaderFactory.scala |   4 +-
 .../sql-tests/results/explain-aqe.sql.out          |   3 +-
 .../resources/sql-tests/results/explain.sql.out    |   3 +-
 .../datasources/FileBasedDataSourceTest.scala      |   9 +-
 .../sql/execution/datasources/orc/OrcTest.scala    |   2 +
 .../datasources/orc/OrcV1SchemaPruningSuite.scala  |   2 +
 .../datasources/orc/OrcV2SchemaPruningSuite.scala  |   2 +
 .../parquet/ParquetColumnIndexSuite.scala          |  13 +
 .../parquet/ParquetFileFormatSuite.scala           |  37 ++
 .../datasources/parquet/ParquetIOSuite.scala       | 351 +++++++++++++++++
 .../parquet/ParquetSchemaPruningSuite.scala        |   2 +
 .../datasources/parquet/ParquetTest.scala          |   2 +
 .../parquet/ParquetVectorizedSuite.scala           | 330 ++++++++++++++++
 25 files changed, 1813 insertions(+), 173 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 9aad649..d268fd0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1009,6 +1009,14 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED =
+    buildConf("spark.sql.parquet.enableNestedColumnVectorizedReader")
+      .doc("Enables vectorized Parquet decoding for nested columns (e.g., 
struct, list, map). " +
+          s"Requires ${PARQUET_VECTORIZED_READER_ENABLED.key} to be enabled.")
+      .version("3.3.0")
+      .booleanConf
+      .createWithDefault(true)
+
   val PARQUET_RECORD_FILTER_ENABLED = 
buildConf("spark.sql.parquet.recordLevelFilter.enabled")
     .doc("If true, enables Parquet's native record-level filtering using the 
pushed down " +
       "filters. " +
@@ -3926,6 +3934,9 @@ class SQLConf extends Serializable with Logging {
 
   def parquetVectorizedReaderEnabled: Boolean = 
getConf(PARQUET_VECTORIZED_READER_ENABLED)
 
+  def parquetVectorizedReaderNestedColumnEnabled: Boolean =
+    getConf(PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED)
+
   def parquetVectorizedReaderBatchSize: Int = 
getConf(PARQUET_VECTORIZED_READER_BATCH_SIZE)
 
   def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
new file mode 100644
index 0000000..4b29520
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.MapType;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Contains necessary information representing a Parquet column, either of 
primitive or nested type.
+ */
+final class ParquetColumnVector {
+  private final ParquetColumn column;
+  private final List<ParquetColumnVector> children;
+  private final WritableColumnVector vector;
+
+  /**
+   * Repetition & Definition levels
+   * These are allocated only for leaf columns; for non-leaf columns, they 
simply maintain
+   * references to that of the former.
+   */
+  private WritableColumnVector repetitionLevels;
+  private WritableColumnVector definitionLevels;
+
+  /** Whether this column is primitive (i.e., leaf column) */
+  private final boolean isPrimitive;
+
+  /** Reader for this column - only set if 'isPrimitive' is true */
+  private VectorizedColumnReader columnReader;
+
+  ParquetColumnVector(
+      ParquetColumn column,
+      WritableColumnVector vector,
+      int capacity,
+      MemoryMode memoryMode,
+      Set<ParquetColumn> missingColumns) {
+
+    DataType sparkType = column.sparkType();
+    if (!sparkType.sameType(vector.dataType())) {
+      throw new IllegalArgumentException("Spark type: " + sparkType +
+        " doesn't match the type: " + vector.dataType() + " in column vector");
+    }
+
+    this.column = column;
+    this.vector = vector;
+    this.children = new ArrayList<>();
+    this.isPrimitive = column.isPrimitive();
+
+    if (missingColumns.contains(column)) {
+      vector.setAllNull();
+      return;
+    }
+
+    if (isPrimitive) {
+      // TODO: avoid allocating these if not necessary, for instance, the node 
is of top-level
+      //  and is not repeated, or the node is not top-level but its max 
repetition level is 0.
+      repetitionLevels = allocateLevelsVector(capacity, memoryMode);
+      definitionLevels = allocateLevelsVector(capacity, memoryMode);
+    } else {
+      Preconditions.checkArgument(column.children().size() == 
vector.getNumChildren());
+      for (int i = 0; i < column.children().size(); i++) {
+        ParquetColumnVector childCv = new 
ParquetColumnVector(column.children().apply(i),
+          vector.getChild(i), capacity, memoryMode, missingColumns);
+        children.add(childCv);
+
+        // Only use levels from non-missing child, this can happen if only 
some but not all
+        // fields of a struct are missing.
+        if (!childCv.vector.isAllNull()) {
+          this.repetitionLevels = childCv.repetitionLevels;
+          this.definitionLevels = childCv.definitionLevels;
+        }
+      }
+
+      // This can happen if all the fields of a struct are missing, in which 
case we should mark
+      // the struct itself as a missing column
+      if (repetitionLevels == null) {
+        vector.setAllNull();
+      }
+    }
+  }
+
+  /**
+   * Returns all the children of this column.
+   */
+  List<ParquetColumnVector> getChildren() {
+    return children;
+  }
+
+  /**
+   * Returns all the leaf columns in depth-first order.
+   */
+  List<ParquetColumnVector> getLeaves() {
+    List<ParquetColumnVector> result = new ArrayList<>();
+    getLeavesHelper(this, result);
+    return result;
+  }
+
+  private static void getLeavesHelper(ParquetColumnVector vector, 
List<ParquetColumnVector> coll) {
+    if (vector.isPrimitive) {
+      coll.add(vector);
+    } else {
+      for (ParquetColumnVector child : vector.children) {
+        getLeavesHelper(child, coll);
+      }
+    }
+  }
+
+  /**
+   * Assembles this column and calculate collection offsets recursively.
+   * This is a no-op for primitive columns.
+   */
+  void assemble() {
+    // nothing to do if the column itself is missing
+    if (vector.isAllNull()) return;
+
+    DataType type = column.sparkType();
+    if (type instanceof ArrayType || type instanceof MapType) {
+      for (ParquetColumnVector child : children) {
+        child.assemble();
+      }
+      assembleCollection();
+    } else if (type instanceof StructType) {
+      for (ParquetColumnVector child : children) {
+        child.assemble();
+      }
+      assembleStruct();
+    }
+  }
+
+  /**
+   * Resets this Parquet column vector, which includes resetting all the 
writable column vectors
+   * (used to store values, definition levels, and repetition levels) for this 
and all its children.
+   */
+  void reset() {
+    // nothing to do if the column itself is missing
+    if (vector.isAllNull()) return;
+
+    vector.reset();
+    repetitionLevels.reset();
+    definitionLevels.reset();
+    for (ParquetColumnVector child : children) {
+      child.reset();
+    }
+  }
+
+  /**
+   * Returns the {@link ParquetColumn} of this column vector.
+   */
+  ParquetColumn getColumn() {
+    return this.column;
+  }
+
+  /**
+   * Returns the writable column vector used to store values.
+   */
+  WritableColumnVector getValueVector() {
+    return this.vector;
+  }
+
+  /**
+   * Returns the writable column vector used to store repetition levels.
+   */
+  WritableColumnVector getRepetitionLevelVector() {
+    return this.repetitionLevels;
+  }
+
+  /**
+   * Returns the writable column vector used to store definition levels.
+   */
+  WritableColumnVector getDefinitionLevelVector() {
+    return this.definitionLevels;
+  }
+
+  /**
+   * Returns the column reader for reading a Parquet column.
+   */
+  VectorizedColumnReader getColumnReader() {
+    return this.columnReader;
+  }
+
+  /**
+   * Sets the column vector to 'reader'. Note this can only be called on a 
primitive Parquet
+   * column.
+   */
+  void setColumnReader(VectorizedColumnReader reader) {
+    if (!isPrimitive) {
+      throw new IllegalStateException("Can't set reader for non-primitive 
column");
+    }
+    this.columnReader = reader;
+  }
+
+  /**
+   * Assemble collections, e.g., array, map.
+   */
+  private void assembleCollection() {
+    int maxDefinitionLevel = column.definitionLevel();
+    int maxElementRepetitionLevel = column.repetitionLevel();
+
+    // There are 4 cases when calculating definition levels:
+    //   1. definitionLevel == maxDefinitionLevel
+    //     ==> value is defined and not null
+    //   2. definitionLevel == maxDefinitionLevel - 1
+    //     ==> value is null
+    //   3. definitionLevel < maxDefinitionLevel - 1
+    //     ==> value doesn't exist since one of its optional parents is null
+    //   4. definitionLevel > maxDefinitionLevel
+    //     ==> value is a nested element within an array or map
+    //
+    // `i` is the index over all leaf elements of this array, while `offset` 
is the index over
+    // all top-level elements of this array.
+    int rowId = 0;
+    for (int i = 0, offset = 0; i < definitionLevels.getElementsAppended();
+         i = getNextCollectionStart(maxElementRepetitionLevel, i)) {
+      vector.reserve(rowId + 1);
+      int definitionLevel = definitionLevels.getInt(i);
+      if (definitionLevel <= maxDefinitionLevel) {
+        // This means the value is not an array element, but a collection that 
is either null or
+        // empty. In this case, we should increase offset to skip it when 
returning an array
+        // starting from the offset.
+        //
+        // For instance, considering an array of strings with 3 elements like 
the following:
+        //  null, [], [a, b, c]
+        // the child array (which is of String type) in this case will be:
+        //  null:   1 1 0 0 0
+        //  length: 0 0 1 1 1
+        //  offset: 0 0 0 1 2
+        // and the array itself will be:
+        //  null:   1 0 0
+        //  length: 0 0 3
+        //  offset: 0 1 2
+        //
+        // It's important that for the third element `[a, b, c]`, the offset 
in the array
+        // (not the elements) starts from 2 since otherwise we'd include the 
first & second null
+        // element from child array in the result.
+        offset += 1;
+      }
+      if (definitionLevel <= maxDefinitionLevel - 1) {
+        // Collection is null or one of its optional parents is null
+        vector.putNull(rowId++);
+      } else if (definitionLevel == maxDefinitionLevel) {
+        // Collection is defined but empty
+        vector.putNotNull(rowId);
+        vector.putArray(rowId, offset, 0);
+        rowId++;
+      } else if (definitionLevel > maxDefinitionLevel) {
+        // Collection is defined and non-empty: find out how many top elements 
are there until the
+        // start of the next array.
+        vector.putNotNull(rowId);
+        int length = getCollectionSize(maxElementRepetitionLevel, i);
+        vector.putArray(rowId, offset, length);
+        offset += length;
+        rowId++;
+      }
+    }
+    vector.addElementsAppended(rowId);
+  }
+
+  private void assembleStruct() {
+    int maxRepetitionLevel = column.repetitionLevel();
+    int maxDefinitionLevel = column.definitionLevel();
+
+    vector.reserve(definitionLevels.getElementsAppended());
+
+    int rowId = 0;
+    boolean hasRepetitionLevels = repetitionLevels.getElementsAppended() > 0;
+    for (int i = 0; i < definitionLevels.getElementsAppended(); i++) {
+      // If repetition level > maxRepetitionLevel, the value is a nested 
element (e.g., an array
+      // element in struct<array<int>>), and we should skip the definition 
level since it doesn't
+      // represent with the struct.
+      if (!hasRepetitionLevels || repetitionLevels.getInt(i) <= 
maxRepetitionLevel) {
+        if (definitionLevels.getInt(i) <= maxDefinitionLevel - 1) {
+          // Struct is null
+          vector.putNull(rowId);
+          rowId++;
+        } else if (definitionLevels.getInt(i) >= maxDefinitionLevel) {
+          vector.putNotNull(rowId);
+          rowId++;
+        }
+      }
+    }
+    vector.addElementsAppended(rowId);
+  }
+
+  private static WritableColumnVector allocateLevelsVector(int capacity, 
MemoryMode memoryMode) {
+    switch (memoryMode) {
+      case ON_HEAP:
+        return new OnHeapColumnVector(capacity, DataTypes.IntegerType);
+      case OFF_HEAP:
+        return new OffHeapColumnVector(capacity, DataTypes.IntegerType);
+      default:
+        throw new IllegalArgumentException("Unknown memory mode: " + 
memoryMode);
+    }
+  }
+
+  /**
+   * For a collection (i.e., array or map) element at index 'idx', returns the 
starting index of
+   * the next collection after it.
+   *
+   * @param maxRepetitionLevel the maximum repetition level for the elements 
in this collection
+   * @param idx the index of this collection in the Parquet column
+   * @return the starting index of the next collection
+   */
+  private int getNextCollectionStart(int maxRepetitionLevel, int idx) {
+    idx += 1;
+    for (; idx < repetitionLevels.getElementsAppended(); idx++) {
+      if (repetitionLevels.getInt(idx) <= maxRepetitionLevel) {
+        break;
+      }
+    }
+    return idx;
+  }
+
+  /**
+   * Gets the size of a collection (i.e., array or map) element, starting at 
'idx'.
+   *
+   * @param maxRepetitionLevel the maximum repetition level for the elements 
in this collection
+   * @param idx the index of this collection in the Parquet column
+   * @return the size of this collection
+   */
+  private int getCollectionSize(int maxRepetitionLevel, int idx) {
+    int size = 1;
+    for (idx += 1; idx < repetitionLevels.getElementsAppended(); idx++) {
+      if (repetitionLevels.getInt(idx) <= maxRepetitionLevel) {
+        break;
+      } else if (repetitionLevels.getInt(idx) <= maxRepetitionLevel + 1) {
+        // Only count elements which belong to the current collection
+        // For instance, suppose we have the following Parquet schema:
+        //
+        // message schema {                        max rl   max dl
+        //   optional group col (LIST) {              0        1
+        //     repeated group list {                  1        2
+        //       optional group element (LIST) {      1        3
+        //         repeated group list {              2        4
+        //           required int32 element;          2        4
+        //         }
+        //       }
+        //     }
+        //   }
+        // }
+        //
+        // For a list such as: [[[0, 1], [2, 3]], [[4, 5], [6, 7]]], the 
repetition & definition
+        // levels would be:
+        //
+        // repetition levels: [0, 2, 1, 2, 0, 2, 1, 2]
+        // definition levels: [2, 2, 2, 2, 2, 2, 2, 2]
+        //
+        // When calculating collection size for the outer array, we should 
only count repetition
+        // levels whose value is <= 1 (which is the max repetition level for 
the inner array)
+        size++;
+      }
+    }
+    return size;
+  }
+}
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
index b260887..bde6940 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources.parquet;
 
+import org.apache.parquet.column.ColumnDescriptor;
+
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -42,24 +44,52 @@ final class ParquetReadState {
   /** The current row range */
   private RowRange currentRange;
 
+  /** Maximum repetition level for the Parquet column */
+  final int maxRepetitionLevel;
+
   /** Maximum definition level for the Parquet column */
   final int maxDefinitionLevel;
 
+  /** Whether this column is required */
+  final boolean isRequired;
+
   /** The current index over all rows within the column chunk. This is used to 
check if the
    * current row should be skipped by comparing against the row ranges. */
   long rowId;
 
-  /** The offset in the current batch to put the next value */
-  int offset;
+  /** The offset in the current batch to put the next value in value vector */
+  int valueOffset;
+
+  /** The offset in the current batch to put the next value in repetition & 
definition vector */
+  int levelOffset;
 
   /** The remaining number of values to read in the current page */
   int valuesToReadInPage;
 
-  /** The remaining number of values to read in the current batch */
-  int valuesToReadInBatch;
+  /** The remaining number of rows to read in the current batch */
+  int rowsToReadInBatch;
+
+
+  /* The following fields are only used when reading repeated values */
+
+  /** When processing repeated values, whether we've found the beginning of 
the first list after the
+   *  current batch. */
+  boolean lastListCompleted;
 
-  ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong 
rowIndexes) {
-    this.maxDefinitionLevel = maxDefinitionLevel;
+  /** When processing repeated types, the number of accumulated definition 
levels to process */
+  int numBatchedDefLevels;
+
+  /** When processing repeated types, whether we should skip the current batch 
of definition
+   * levels. */
+  boolean shouldSkip;
+
+  ParquetReadState(
+      ColumnDescriptor descriptor,
+      boolean isRequired,
+      PrimitiveIterator.OfLong rowIndexes) {
+    this.maxRepetitionLevel = descriptor.getMaxRepetitionLevel();
+    this.maxDefinitionLevel = descriptor.getMaxDefinitionLevel();
+    this.isRequired = isRequired;
     this.rowRanges = constructRanges(rowIndexes);
     nextRange();
   }
@@ -101,8 +131,12 @@ final class ParquetReadState {
    * Must be called at the beginning of reading a new batch.
    */
   void resetForNewBatch(int batchSize) {
-    this.offset = 0;
-    this.valuesToReadInBatch = batchSize;
+    this.valueOffset = 0;
+    this.levelOffset = 0;
+    this.rowsToReadInBatch = batchSize;
+    this.lastListCompleted = this.maxRepetitionLevel == 0; // always true for 
non-repeated column
+    this.numBatchedDefLevels = 0;
+    this.shouldSkip = false;
   }
 
   /**
@@ -128,16 +162,6 @@ final class ParquetReadState {
   }
 
   /**
-   * Advance the current offset and rowId to the new values.
-   */
-  void advanceOffsetAndRowId(int newOffset, long newRowId) {
-    valuesToReadInBatch -= (newOffset - offset);
-    valuesToReadInPage -= (newRowId - rowId);
-    offset = newOffset;
-    rowId = newRowId;
-  }
-
-  /**
    * Advance to the next range.
    */
   void nextRange() {
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 5669534..292a0f9 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -74,6 +74,7 @@ public abstract class SpecificParquetRecordReaderBase<T> 
extends RecordReader<Vo
   // Keep track of the version of the parquet writer. An older version wrote
   // corrupt delta byte arrays, and the version check is needed to detect that.
   protected ParsedVersion writerVersion;
+  protected ParquetColumn parquetColumn;
 
   /**
    * The total number of rows this RecordReader will eventually read. The sum 
of the
@@ -112,7 +113,11 @@ public abstract class SpecificParquetRecordReaderBase<T> 
extends RecordReader<Vo
     fileReader.setRequestedSchema(requestedSchema);
     String sparkRequestedSchemaString =
         
configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA());
-    this.sparkSchema = 
StructType$.MODULE$.fromString(sparkRequestedSchemaString);
+    StructType sparkRequestedSchema = 
StructType$.MODULE$.fromString(sparkRequestedSchemaString);
+    ParquetToSparkSchemaConverter converter = new 
ParquetToSparkSchemaConverter(configuration);
+    this.parquetColumn = converter.convertParquetColumn(requestedSchema,
+      Option.apply(sparkRequestedSchema));
+    this.sparkSchema = (StructType) parquetColumn.sparkType();
     this.totalRowCount = fileReader.getFilteredRecordCount();
 
     // For test purpose.
@@ -174,7 +179,9 @@ public abstract class SpecificParquetRecordReaderBase<T> 
extends RecordReader<Vo
       }
     }
     fileReader.setRequestedSchema(requestedSchema);
-    this.sparkSchema = new 
ParquetToSparkSchemaConverter(config).convert(requestedSchema);
+    this.parquetColumn = new ParquetToSparkSchemaConverter(config)
+      .convertParquetColumn(requestedSchema, Option.empty());
+    this.sparkSchema = (StructType) parquetColumn.sparkType();
     this.totalRowCount = fileReader.getFilteredRecordCount();
   }
 
@@ -191,7 +198,9 @@ public abstract class SpecificParquetRecordReaderBase<T> 
extends RecordReader<Vo
     config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false);
     config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
     config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
-    this.sparkSchema = new 
ParquetToSparkSchemaConverter(config).convert(requestedSchema);
+    this.parquetColumn = new ParquetToSparkSchemaConverter(config)
+      .convertParquetColumn(requestedSchema, Option.empty());
+    this.sparkSchema = (StructType) parquetColumn.sparkType();
     this.totalRowCount = totalRowCount;
   }
 
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index ee09d2b..c2e85da 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources.parquet;
 
 import java.io.IOException;
 import java.time.ZoneId;
-import java.util.PrimitiveIterator;
 
 import org.apache.parquet.CorruptDeltaByteArrays;
 import org.apache.parquet.VersionParser.ParsedVersion;
@@ -41,7 +40,6 @@ import org.apache.parquet.schema.PrimitiveType;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 import org.apache.spark.sql.types.Decimal;
 
-import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
 import static 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
 
@@ -70,6 +68,11 @@ public class VectorizedColumnReader {
   private VectorizedRleValuesReader defColumn;
 
   /**
+   * Vectorized RLE decoder for repetition levels
+   */
+  private VectorizedRleValuesReader repColumn;
+
+  /**
    * Factory to get type-specific vector updater.
    */
   private final ParquetVectorUpdaterFactory updaterFactory;
@@ -93,9 +96,8 @@ public class VectorizedColumnReader {
 
   public VectorizedColumnReader(
       ColumnDescriptor descriptor,
-      LogicalTypeAnnotation logicalTypeAnnotation,
-      PageReader pageReader,
-      PrimitiveIterator.OfLong rowIndexes,
+      boolean isRequired,
+      PageReadStore pageReadStore,
       ZoneId convertTz,
       String datetimeRebaseMode,
       String datetimeRebaseTz,
@@ -103,9 +105,10 @@ public class VectorizedColumnReader {
       String int96RebaseTz,
       ParsedVersion writerVersion) throws IOException {
     this.descriptor = descriptor;
-    this.pageReader = pageReader;
-    this.readState = new ParquetReadState(descriptor.getMaxDefinitionLevel(), 
rowIndexes);
-    this.logicalTypeAnnotation = logicalTypeAnnotation;
+    this.pageReader = pageReadStore.getPageReader(descriptor);
+    this.readState = new ParquetReadState(descriptor, isRequired,
+      pageReadStore.getRowIndexes().orElse(null));
+    this.logicalTypeAnnotation = 
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
     this.updaterFactory = new ParquetVectorUpdaterFactory(
       logicalTypeAnnotation,
       convertTz,
@@ -161,9 +164,13 @@ public class VectorizedColumnReader {
   }
 
   /**
-   * Reads `total` values from this columnReader into column.
+   * Reads `total` rows from this columnReader into column.
    */
-  void readBatch(int total, WritableColumnVector column) throws IOException {
+  void readBatch(
+      int total,
+      WritableColumnVector column,
+      WritableColumnVector repetitionLevels,
+      WritableColumnVector definitionLevels) throws IOException {
     WritableColumnVector dictionaryIds = null;
     ParquetVectorUpdater updater = updaterFactory.getUpdater(descriptor, 
column.dataType());
 
@@ -174,22 +181,32 @@ public class VectorizedColumnReader {
       dictionaryIds = column.reserveDictionaryIds(total);
     }
     readState.resetForNewBatch(total);
-    while (readState.valuesToReadInBatch > 0) {
+    while (readState.rowsToReadInBatch > 0 || !readState.lastListCompleted) {
       if (readState.valuesToReadInPage == 0) {
         int pageValueCount = readPage();
+        if (pageValueCount < 0) {
+          // we've read all the pages; this could happen when we're reading a 
repeated list and we
+          // don't know where the list will end until we've seen all the pages.
+          break;
+        }
         readState.resetForNewPage(pageValueCount, pageFirstRowIndex);
       }
       PrimitiveType.PrimitiveTypeName typeName =
           descriptor.getPrimitiveType().getPrimitiveTypeName();
       if (isCurrentPageDictionaryEncoded) {
         // Save starting offset in case we need to decode dictionary IDs.
-        int startOffset = readState.offset;
+        int startOffset = readState.valueOffset;
         // Save starting row index so we can check if we need to eagerly 
decode dict ids later
         long startRowId = readState.rowId;
 
         // Read and decode dictionary ids.
-        defColumn.readIntegers(readState, dictionaryIds, column,
-          (VectorizedValuesReader) dataColumn);
+        if (readState.maxRepetitionLevel == 0) {
+          defColumn.readIntegers(readState, dictionaryIds, column, 
definitionLevels,
+            (VectorizedValuesReader) dataColumn);
+        } else {
+          repColumn.readIntegersRepeated(readState, repetitionLevels, 
defColumn, definitionLevels,
+            dictionaryIds, column, (VectorizedValuesReader) dataColumn);
+        }
 
         // TIMESTAMP_MILLIS encoded as INT64 can't be lazily decoded as we 
need to post process
         // the values to add microseconds precision.
@@ -220,24 +237,32 @@ public class VectorizedColumnReader {
           boolean needTransform = castLongToInt || isUnsignedInt32 || 
isUnsignedInt64;
           column.setDictionary(new ParquetDictionary(dictionary, 
needTransform));
         } else {
-          updater.decodeDictionaryIds(readState.offset - startOffset, 
startOffset, column,
+          updater.decodeDictionaryIds(readState.valueOffset - startOffset, 
startOffset, column,
             dictionaryIds, dictionary);
         }
       } else {
-        if (column.hasDictionary() && readState.offset != 0) {
+        if (column.hasDictionary() && readState.valueOffset != 0) {
           // This batch already has dictionary encoded values but this new 
page is not. The batch
           // does not support a mix of dictionary and not so we will decode 
the dictionary.
-          updater.decodeDictionaryIds(readState.offset, 0, column, 
dictionaryIds, dictionary);
+          updater.decodeDictionaryIds(readState.valueOffset, 0, column, 
dictionaryIds, dictionary);
         }
         column.setDictionary(null);
         VectorizedValuesReader valuesReader = (VectorizedValuesReader) 
dataColumn;
-        defColumn.readBatch(readState, column, valuesReader, updater);
+        if (readState.maxRepetitionLevel == 0) {
+          defColumn.readBatch(readState, column, definitionLevels, 
valuesReader, updater);
+        } else {
+          repColumn.readBatchRepeated(readState, repetitionLevels, defColumn, 
definitionLevels,
+            column, valuesReader, updater);
+        }
       }
     }
   }
 
   private int readPage() {
     DataPage page = pageReader.readPage();
+    if (page == null) {
+      return -1;
+    }
     this.pageFirstRowIndex = page.getFirstRowIndex().orElse(0L);
 
     return page.accept(new DataPage.Visitor<Integer>() {
@@ -328,18 +353,18 @@ public class VectorizedColumnReader {
     }
 
     int pageValueCount = page.getValueCount();
-    int bitWidth = 
BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
 
-    this.defColumn = new VectorizedRleValuesReader(bitWidth);
+    int rlBitWidth = 
BytesUtils.getWidthFromMaxInt(descriptor.getMaxRepetitionLevel());
+    this.repColumn = new VectorizedRleValuesReader(rlBitWidth);
+
+    int dlBitWidth = 
BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
+    this.defColumn = new VectorizedRleValuesReader(dlBitWidth);
+
     try {
       BytesInput bytes = page.getBytes();
       ByteBufferInputStream in = bytes.toInputStream();
 
-      // only used now to consume the repetition level data
-      page.getRlEncoding()
-          .getValuesReader(descriptor, REPETITION_LEVEL)
-          .initFromPage(pageValueCount, in);
-
+      repColumn.initFromPage(pageValueCount, in);
       defColumn.initFromPage(pageValueCount, in);
       initDataReader(pageValueCount, page.getValueEncoding(), in);
       return pageValueCount;
@@ -350,11 +375,16 @@ public class VectorizedColumnReader {
 
   private int readPageV2(DataPageV2 page) throws IOException {
     int pageValueCount = page.getValueCount();
-    int bitWidth = 
BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
 
     // do not read the length from the stream. v2 pages handle dividing the 
page bytes.
-    defColumn = new VectorizedRleValuesReader(bitWidth, false);
+    int rlBitWidth = 
BytesUtils.getWidthFromMaxInt(descriptor.getMaxRepetitionLevel());
+    repColumn = new VectorizedRleValuesReader(rlBitWidth, false);
+    repColumn.initFromPage(pageValueCount, 
page.getRepetitionLevels().toInputStream());
+
+    int dlBitWidth = 
BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
+    defColumn = new VectorizedRleValuesReader(dlBitWidth, false);
     defColumn.initFromPage(pageValueCount, 
page.getDefinitionLevels().toInputStream());
+
     try {
       initDataReader(pageValueCount, page.getDataEncoding(), 
page.getData().toInputStream());
       return pageValueCount;
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index da23b5f..80f6f88 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -20,13 +20,17 @@ package org.apache.spark.sql.execution.datasources.parquet;
 import java.io.IOException;
 import java.time.ZoneId;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import scala.collection.JavaConverters;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
 
@@ -44,7 +48,7 @@ import org.apache.spark.sql.types.StructType;
  * A specialized RecordReader that reads into InternalRows or ColumnarBatches 
directly using the
  * Parquet column APIs. This is somewhat based on parquet-mr's ColumnReader.
  *
- * TODO: handle complex types, decimal requiring more than 8 bytes, INT96. 
Schema mismatch.
+ * TODO: decimal requiring more than 8 bytes, INT96. Schema mismatch.
  * All of these can be handled efficiently and easily with codegen.
  *
  * This class can either return InternalRows or ColumnarBatches. With whole 
stage codegen
@@ -64,10 +68,10 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
   private int numBatched = 0;
 
   /**
-   * For each request column, the reader to read this column. This is NULL if 
this column
-   * is missing from the file, in which case we populate the attribute with 
NULL.
+   * Encapsulate writable column vectors with other Parquet related info such 
as
+   * repetition / definition levels.
    */
-  private VectorizedColumnReader[] columnReaders;
+  private ParquetColumnVector[] columnVectors;
 
   /**
    * The number of rows that have been returned.
@@ -80,9 +84,10 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
   private long totalCountLoadedSoFar = 0;
 
   /**
-   * For each column, true if the column is missing in the file and we'll 
instead return NULLs.
+   * For each leaf column, if it is in the set, it means the column is missing 
in the file and
+   * we'll instead return NULLs.
    */
-  private boolean[] missingColumns;
+  private Set<ParquetColumn> missingColumns;
 
   /**
    * The timezone that timestamp INT96 values should be converted to. Null if 
no conversion. Here to
@@ -120,8 +125,6 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
    */
   private ColumnarBatch columnarBatch;
 
-  private WritableColumnVector[] columnVectors;
-
   /**
    * If true, this class returns batches instead of rows.
    */
@@ -246,25 +249,25 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
       }
     }
 
+    WritableColumnVector[] vectors;
     if (memMode == MemoryMode.OFF_HEAP) {
-      columnVectors = OffHeapColumnVector.allocateColumns(capacity, 
batchSchema);
+      vectors = OffHeapColumnVector.allocateColumns(capacity, batchSchema);
     } else {
-      columnVectors = OnHeapColumnVector.allocateColumns(capacity, 
batchSchema);
+      vectors = OnHeapColumnVector.allocateColumns(capacity, batchSchema);
+    }
+    columnarBatch = new ColumnarBatch(vectors);
+
+    columnVectors = new ParquetColumnVector[sparkSchema.fields().length];
+    for (int i = 0; i < columnVectors.length; i++) {
+      columnVectors[i] = new 
ParquetColumnVector(parquetColumn.children().apply(i),
+        vectors[i], capacity, memMode, missingColumns);
     }
-    columnarBatch = new ColumnarBatch(columnVectors);
+
     if (partitionColumns != null) {
       int partitionIdx = sparkSchema.fields().length;
       for (int i = 0; i < partitionColumns.fields().length; i++) {
-        ColumnVectorUtils.populate(columnVectors[i + partitionIdx], 
partitionValues, i);
-        columnVectors[i + partitionIdx].setIsConstant();
-      }
-    }
-
-    // Initialize missing columns with nulls.
-    for (int i = 0; i < missingColumns.length; i++) {
-      if (missingColumns[i]) {
-        columnVectors[i].putNulls(0, capacity);
-        columnVectors[i].setIsConstant();
+        ColumnVectorUtils.populate(vectors[i + partitionIdx], partitionValues, 
i);
+        vectors[i + partitionIdx].setIsConstant();
       }
     }
   }
@@ -298,7 +301,7 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
    * Advances to the next batch of rows. Returns false if there are no more.
    */
   public boolean nextBatch() throws IOException {
-    for (WritableColumnVector vector : columnVectors) {
+    for (ParquetColumnVector vector : columnVectors) {
       vector.reset();
     }
     columnarBatch.setNumRows(0);
@@ -306,10 +309,17 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
     checkEndOfRowGroup();
 
     int num = (int) Math.min(capacity, totalCountLoadedSoFar - rowsReturned);
-    for (int i = 0; i < columnReaders.length; ++i) {
-      if (columnReaders[i] == null) continue;
-      columnReaders[i].readBatch(num, columnVectors[i]);
+    for (ParquetColumnVector cv : columnVectors) {
+      for (ParquetColumnVector leafCv : cv.getLeaves()) {
+        VectorizedColumnReader columnReader = leafCv.getColumnReader();
+        if (columnReader != null) {
+          columnReader.readBatch(num, leafCv.getValueVector(),
+            leafCv.getRepetitionLevelVector(), 
leafCv.getDefinitionLevelVector());
+        }
+      }
+      cv.assemble();
     }
+
     rowsReturned += num;
     columnarBatch.setNumRows(num);
     numBatched = num;
@@ -318,34 +328,61 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
   }
 
   private void initializeInternal() throws IOException, 
UnsupportedOperationException {
-    // Check that the requested schema is supported.
-    missingColumns = new boolean[requestedSchema.getFieldCount()];
-    List<ColumnDescriptor> columns = requestedSchema.getColumns();
-    List<String[]> paths = requestedSchema.getPaths();
-    for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
-      Type t = requestedSchema.getFields().get(i);
-      if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
-        throw new UnsupportedOperationException("Complex types not 
supported.");
-      }
+    missingColumns = new HashSet<>();
+    for (ParquetColumn column : 
JavaConverters.seqAsJavaList(parquetColumn.children())) {
+      checkColumn(column);
+    }
+  }
 
-      String[] colPath = paths.get(i);
-      if (fileSchema.containsPath(colPath)) {
-        ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);
-        if (!fd.equals(columns.get(i))) {
+  /**
+   * Check whether a column from requested schema is missing from the file 
schema, or whether it
+   * conforms to the type of the file schema.
+   */
+  private void checkColumn(ParquetColumn column) throws IOException {
+    String[] path = JavaConverters.seqAsJavaList(column.path()).toArray(new 
String[0]);
+    if (containsPath(fileSchema, path)) {
+      if (column.isPrimitive()) {
+        ColumnDescriptor desc = column.descriptor().get();
+        ColumnDescriptor fd = fileSchema.getColumnDescription(desc.getPath());
+        if (!fd.equals(desc)) {
           throw new UnsupportedOperationException("Schema evolution not 
supported.");
         }
-        missingColumns[i] = false;
       } else {
-        if (columns.get(i).getMaxDefinitionLevel() == 0) {
-          // Column is missing in data but the required data is non-nullable. 
This file is invalid.
-          throw new IOException("Required column is missing in data file. Col: 
" +
-            Arrays.toString(colPath));
+        for (ParquetColumn childColumn : 
JavaConverters.seqAsJavaList(column.children())) {
+          checkColumn(childColumn);
         }
-        missingColumns[i] = true;
       }
+    } else { // A missing column which is either primitive or complex
+      if (column.required()) {
+        // Column is missing in data but the required data is non-nullable. 
This file is invalid.
+        throw new IOException("Required column is missing in data file. Col: " 
+
+          Arrays.toString(path));
+      }
+      missingColumns.add(column);
     }
   }
 
+  /**
+   * Checks whether the given 'path' exists in 'parquetType'. The difference 
between this and
+   * {@link MessageType#containsPath(String[])} is that the latter only 
support paths to leaf
+   * nodes, while this support paths both to leaf and non-leaf nodes.
+   */
+  private boolean containsPath(Type parquetType, String[] path) {
+    return containsPath(parquetType, path, 0);
+  }
+
+  private boolean containsPath(Type parquetType, String[] path, int depth) {
+    if (path.length == depth) return true;
+    if (parquetType instanceof GroupType) {
+      String fieldName = path[depth];
+      GroupType parquetGroupType = (GroupType) parquetType;
+      if (parquetGroupType.containsField(fieldName)) {
+        return containsPath(parquetGroupType.getType(fieldName), path, depth + 
1);
+      }
+    }
+    return false;
+  }
+
   private void checkEndOfRowGroup() throws IOException {
     if (rowsReturned != totalCountLoadedSoFar) return;
     PageReadStore pages = reader.readNextRowGroup();
@@ -353,23 +390,26 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
       throw new IOException("expecting more rows but reached last block. Read "
           + rowsReturned + " out of " + totalRowCount);
     }
-    List<ColumnDescriptor> columns = requestedSchema.getColumns();
-    List<Type> types = requestedSchema.asGroupType().getFields();
-    columnReaders = new VectorizedColumnReader[columns.size()];
-    for (int i = 0; i < columns.size(); ++i) {
-      if (missingColumns[i]) continue;
-      columnReaders[i] = new VectorizedColumnReader(
-        columns.get(i),
-        types.get(i).getLogicalTypeAnnotation(),
-        pages.getPageReader(columns.get(i)),
-        pages.getRowIndexes().orElse(null),
-        convertTz,
-        datetimeRebaseMode,
-        datetimeRebaseTz,
-        int96RebaseMode,
-        int96RebaseTz,
-        writerVersion);
+    for (ParquetColumnVector cv : columnVectors) {
+      initColumnReader(pages, cv);
     }
     totalCountLoadedSoFar += pages.getRowCount();
   }
+
+  private void initColumnReader(PageReadStore pages, ParquetColumnVector cv) 
throws IOException {
+    if (!missingColumns.contains(cv.getColumn())) {
+      if (cv.getColumn().isPrimitive()) {
+        ParquetColumn column = cv.getColumn();
+        VectorizedColumnReader reader = new VectorizedColumnReader(
+          column.descriptor().get(), column.required(), pages, convertTz, 
datetimeRebaseMode,
+          datetimeRebaseTz, int96RebaseMode, int96RebaseTz, writerVersion);
+        cv.setColumnReader(reader);
+      } else {
+        // Not in missing columns and is a complex type: this must be a struct
+        for (ParquetColumnVector childCv : cv.getChildren()) {
+          initColumnReader(pages, childCv);
+        }
+      }
+    }
+  }
 }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index bd7cbc7..2cc763a 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -96,13 +96,13 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
   public void initFromPage(int valueCount, ByteBufferInputStream in) throws 
IOException {
     this.in = in;
     if (fixedWidth) {
-      // initialize for repetition and definition levels
+      // Initialize for repetition and definition levels
       if (readLength) {
         int length = readIntLittleEndian();
         this.in = in.sliceStream(length);
       }
     } else {
-      // initialize for values
+      // Initialize for values
       if (in.available() > 0) {
         init(in.read());
       }
@@ -157,47 +157,52 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
   }
 
   /**
-   * Reads a batch of values into vector `values`, using `valueReader`. The 
related states such
-   * as row index, offset, number of values left in the batch and page, etc, 
are tracked by
-   * `state`. The type-specific `updater` is used to update or skip values.
+   * Reads a batch of definition levels and values into vector 'defLevels' and 
'values'
+   * respectively. The values are read using 'valueReader'.
    * <p>
-   * This reader reads the definition levels and then will read from 
`valueReader` for the
-   * non-null values. If the value is null, `values` will be populated with 
null value.
+   * The related states such as row index, offset, number of values left in 
the batch and page,
+   * are tracked by 'state'. The type-specific 'updater' is used to update or 
skip values.
+   * <p>
+   * This reader reads the definition levels and then will read from 
'valueReader' for the
+   * non-null values. If the value is null, 'values' will be populated with 
null value.
    */
   public void readBatch(
       ParquetReadState state,
       WritableColumnVector values,
+      WritableColumnVector defLevels,
       VectorizedValuesReader valueReader,
       ParquetVectorUpdater updater) {
-    readBatchInternal(state, values, values, valueReader, updater);
+    readBatchInternal(state, values, values, defLevels, valueReader, updater);
   }
 
   /**
-   * Decoding for dictionary ids. The IDs are populated into `values` and the 
nullability is
-   * populated into `nulls`.
+   * Decoding for dictionary ids. The IDs are populated into 'values' and the 
nullability is
+   * populated into 'nulls'.
    */
   public void readIntegers(
       ParquetReadState state,
       WritableColumnVector values,
       WritableColumnVector nulls,
-      VectorizedValuesReader data) {
-    readBatchInternal(state, values, nulls, data, new 
ParquetVectorUpdaterFactory.IntegerUpdater());
+      WritableColumnVector defLevels,
+      VectorizedValuesReader valueReader) {
+    readBatchInternal(state, values, nulls, defLevels, valueReader,
+      new ParquetVectorUpdaterFactory.IntegerUpdater());
   }
 
   private void readBatchInternal(
       ParquetReadState state,
       WritableColumnVector values,
       WritableColumnVector nulls,
+      WritableColumnVector defLevels,
       VectorizedValuesReader valueReader,
       ParquetVectorUpdater updater) {
 
-    int offset = state.offset;
     long rowId = state.rowId;
-    int leftInBatch = state.valuesToReadInBatch;
+    int leftInBatch = state.rowsToReadInBatch;
     int leftInPage = state.valuesToReadInPage;
 
     while (leftInBatch > 0 && leftInPage > 0) {
-      if (this.currentCount == 0) this.readNextGroup();
+      if (currentCount == 0 && !readNextGroup()) break;
       int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount));
 
       long rangeStart = state.currentRangeStart();
@@ -210,11 +215,11 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
       } else if (rowId > rangeEnd) {
         state.nextRange();
       } else {
-        // the range [rowId, rowId + n) overlaps with the current row range in 
state
+        // The range [rowId, rowId + n) overlaps with the current row range in 
state
         long start = Math.max(rangeStart, rowId);
         long end = Math.min(rangeEnd, rowId + n - 1);
 
-        // skip the part [rowId, start)
+        // Skip the part [rowId, start)
         int toSkip = (int) (start - rowId);
         if (toSkip > 0) {
           skipValues(toSkip, state, valueReader, updater);
@@ -222,36 +227,347 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
           leftInPage -= toSkip;
         }
 
-        // read the part [start, end]
+        // Read the part [start, end]
         n = (int) (end - start + 1);
+        readValuesN(n, state, defLevels, values, nulls, valueReader, updater);
+
+        state.levelOffset += n;
+        leftInBatch -= n;
+        rowId += n;
+        leftInPage -= n;
+        currentCount -= n;
+        defLevels.addElementsAppended(n);
+      }
+    }
+
+    state.rowsToReadInBatch = leftInBatch;
+    state.valuesToReadInPage = leftInPage;
+    state.rowId = rowId;
+  }
 
-        switch (mode) {
-          case RLE:
-            if (currentValue == state.maxDefinitionLevel) {
-              updater.readValues(n, offset, values, valueReader);
+  /**
+   * Reads a batch of repetition levels, definition levels and values into 
'repLevels',
+   * 'defLevels' and 'values' respectively. The definition levels and values 
are read via
+   * 'defLevelsReader' and 'valueReader' respectively.
+   * <p>
+   * The related states such as row index, offset, number of rows left in the 
batch and page,
+   * are tracked by 'state'. The type-specific 'updater' is used to update or 
skip values.
+   */
+  public void readBatchRepeated(
+      ParquetReadState state,
+      WritableColumnVector repLevels,
+      VectorizedRleValuesReader defLevelsReader,
+      WritableColumnVector defLevels,
+      WritableColumnVector values,
+      VectorizedValuesReader valueReader,
+      ParquetVectorUpdater updater) {
+    readBatchRepeatedInternal(state, repLevels, defLevelsReader, defLevels, 
values, values, true,
+      valueReader, updater);
+  }
+
+  /**
+   * Reads a batch of repetition levels, definition levels and integer values 
into 'repLevels',
+   * 'defLevels', 'values' and 'nulls' respectively. The definition levels and 
values are read via
+   * 'defLevelsReader' and 'valueReader' respectively.
+   * <p>
+   * The 'values' vector is used to hold non-null values, while 'nulls' vector 
is used to hold
+   * null values.
+   * <p>
+   * The related states such as row index, offset, number of rows left in the 
batch and page,
+   * are tracked by 'state'.
+   * <p>
+   * Unlike 'readBatchRepeated', this is used to decode dictionary indices in 
dictionary encoding.
+   */
+  public void readIntegersRepeated(
+      ParquetReadState state,
+      WritableColumnVector repLevels,
+      VectorizedRleValuesReader defLevelsReader,
+      WritableColumnVector defLevels,
+      WritableColumnVector values,
+      WritableColumnVector nulls,
+      VectorizedValuesReader valueReader) {
+    readBatchRepeatedInternal(state, repLevels, defLevelsReader, defLevels, 
values, nulls, false,
+      valueReader, new ParquetVectorUpdaterFactory.IntegerUpdater());
+  }
+
+  /**
+   * Keep reading repetition level values from the page until either: 1) we've 
read enough
+   * top-level rows to fill the current batch, or 2) we've drained the data 
page completely.
+   *
+   * @param valuesReused whether 'values' vector is reused for 'nulls'
+   */
+  public void readBatchRepeatedInternal(
+      ParquetReadState state,
+      WritableColumnVector repLevels,
+      VectorizedRleValuesReader defLevelsReader,
+      WritableColumnVector defLevels,
+      WritableColumnVector values,
+      WritableColumnVector nulls,
+      boolean valuesReused,
+      VectorizedValuesReader valueReader,
+      ParquetVectorUpdater updater) {
+
+    int leftInBatch = state.rowsToReadInBatch;
+    int leftInPage = state.valuesToReadInPage;
+    long rowId = state.rowId;
+
+    DefLevelProcessor defLevelProcessor = new 
DefLevelProcessor(defLevelsReader, state, defLevels,
+      values, nulls, valuesReused, valueReader, updater);
+
+    while ((leftInBatch > 0 || !state.lastListCompleted) && leftInPage > 0) {
+      if (currentCount == 0 && !readNextGroup()) break;
+
+      // Values to read in the current RLE/PACKED block, must be <= what's 
left in the page
+      int valuesLeftInBlock = Math.min(leftInPage, currentCount);
+
+      // The current row range start and end
+      long rangeStart = state.currentRangeStart();
+      long rangeEnd = state.currentRangeEnd();
+
+      switch (mode) {
+        case RLE:
+          // This RLE block is consist of top-level rows, so we'll need to 
check
+          // if the rows should be skipped according to row indexes.
+          if (currentValue == 0) {
+            if (leftInBatch == 0) {
+              state.lastListCompleted = true;
             } else {
-              nulls.putNulls(offset, n);
+              // # of rows to read in the block, must be <= what's left in the 
current batch
+              int n = Math.min(leftInBatch, valuesLeftInBlock);
+
+              if (rowId + n < rangeStart) {
+                // Need to skip all rows in [rowId, rowId + n)
+                defLevelProcessor.skipValues(n);
+                rowId += n;
+                currentCount -= n;
+                leftInPage -= n;
+              } else if (rowId > rangeEnd) {
+                // The current row index already beyond the current range: 
move to the next range
+                // and repeat
+                state.nextRange();
+              } else {
+                // The range [rowId, rowId + n) overlaps with the current row 
range
+                long start = Math.max(rangeStart, rowId);
+                long end = Math.min(rangeEnd, rowId + n - 1);
+
+                // Skip the rows in [rowId, start)
+                int toSkip = (int) (start - rowId);
+                if (toSkip > 0) {
+                  defLevelProcessor.skipValues(toSkip);
+                  rowId += toSkip;
+                  currentCount -= toSkip;
+                  leftInPage -= toSkip;
+                }
+
+                // Read the rows in [start, end]
+                n = (int) (end - start + 1);
+
+                if (n > 0) {
+                  repLevels.appendInts(n, 0);
+                  defLevelProcessor.readValues(n);
+                }
+
+                rowId += n;
+                currentCount -= n;
+                leftInBatch -= n;
+                leftInPage -= n;
+              }
+            }
+          } else {
+            // Not a top-level row: just read all the repetition levels in the 
block if the row
+            // should be included according to row indexes, else skip the rows.
+            if (!state.shouldSkip) {
+              repLevels.appendInts(valuesLeftInBlock, currentValue);
             }
-            break;
-          case PACKED:
-            for (int i = 0; i < n; ++i) {
-              if (currentBuffer[currentBufferIdx++] == 
state.maxDefinitionLevel) {
-                updater.readValue(offset + i, values, valueReader);
+            state.numBatchedDefLevels += valuesLeftInBlock;
+            leftInPage -= valuesLeftInBlock;
+            currentCount -= valuesLeftInBlock;
+          }
+          break;
+        case PACKED:
+          int i = 0;
+
+          for (; i < valuesLeftInBlock; i++) {
+            int currentValue = currentBuffer[currentBufferIdx + i];
+            if (currentValue == 0) {
+              if (leftInBatch == 0) {
+                state.lastListCompleted = true;
+                break;
+              } else if (rowId < rangeStart) {
+                // This is a top-level row, therefore check if we should skip 
it with row indexes
+                // the row is before the current range, skip it
+                defLevelProcessor.skipValues(1);
+              } else if (rowId > rangeEnd) {
+                // The row is after the current range, move to the next range 
and compare again
+                state.nextRange();
+                break;
               } else {
-                nulls.putNull(offset + i);
+                // The row is in the current range, decrement the row counter 
and read it
+                leftInBatch--;
+                repLevels.appendInt(0);
+                defLevelProcessor.readValues(1);
+              }
+              rowId++;
+            } else {
+              if (!state.shouldSkip) {
+                repLevels.appendInt(currentValue);
               }
+              state.numBatchedDefLevels += 1;
             }
-            break;
+          }
+
+          leftInPage -= i;
+          currentCount -= i;
+          currentBufferIdx += i;
+          break;
+      }
+    }
+
+    // Process all the batched def levels
+    defLevelProcessor.finish();
+
+    state.rowsToReadInBatch = leftInBatch;
+    state.valuesToReadInPage = leftInPage;
+    state.rowId = rowId;
+  }
+
+  private static class DefLevelProcessor {
+    private final VectorizedRleValuesReader reader;
+    private final ParquetReadState state;
+    private final WritableColumnVector defLevels;
+    private final WritableColumnVector values;
+    private final WritableColumnVector nulls;
+    private final boolean valuesReused;
+    private final VectorizedValuesReader valueReader;
+    private final ParquetVectorUpdater updater;
+
+    DefLevelProcessor(
+        VectorizedRleValuesReader reader,
+        ParquetReadState state,
+        WritableColumnVector defLevels,
+        WritableColumnVector values,
+        WritableColumnVector nulls,
+        boolean valuesReused,
+        VectorizedValuesReader valueReader,
+        ParquetVectorUpdater updater) {
+      this.reader = reader;
+      this.state = state;
+      this.defLevels = defLevels;
+      this.values = values;
+      this.nulls = nulls;
+      this.valuesReused = valuesReused;
+      this.valueReader = valueReader;
+      this.updater = updater;
+    }
+
+    void readValues(int n) {
+      if (!state.shouldSkip) {
+        state.numBatchedDefLevels += n;
+      } else {
+        reader.skipValues(state.numBatchedDefLevels, state, valueReader, 
updater);
+        state.numBatchedDefLevels = n;
+        state.shouldSkip = false;
+      }
+    }
+
+    void skipValues(int n) {
+      if (state.shouldSkip) {
+        state.numBatchedDefLevels += n;
+      } else {
+        reader.readValues(state.numBatchedDefLevels, state, defLevels, values, 
nulls, valuesReused,
+          valueReader, updater);
+        state.numBatchedDefLevels = n;
+        state.shouldSkip = true;
+      }
+    }
+
+    void finish() {
+      if (state.numBatchedDefLevels > 0) {
+        if (state.shouldSkip) {
+          reader.skipValues(state.numBatchedDefLevels, state, valueReader, 
updater);
+        } else {
+          reader.readValues(state.numBatchedDefLevels, state, defLevels, 
values, nulls,
+            valuesReused, valueReader, updater);
         }
-        offset += n;
-        leftInBatch -= n;
-        rowId += n;
-        leftInPage -= n;
-        currentCount -= n;
+        state.numBatchedDefLevels = 0;
       }
     }
+  }
+
+  /**
+   * Read the next 'total' values (either null or non-null) from this 
definition level reader and
+   * 'valueReader'. The definition levels are read into 'defLevels'. If a 
value is not
+   * null, it is appended to 'values'. Otherwise, a null bit will be set in 
'nulls'.
+   *
+   * This is only used when reading repeated values.
+   */
+  private void readValues(
+      int total,
+      ParquetReadState state,
+      WritableColumnVector defLevels,
+      WritableColumnVector values,
+      WritableColumnVector nulls,
+      boolean valuesReused,
+      VectorizedValuesReader valueReader,
+      ParquetVectorUpdater updater) {
+
+    defLevels.reserveAdditional(total);
+    values.reserveAdditional(total);
+    if (!valuesReused) {
+      // 'nulls' is a separate column vector so we'll have to reserve it 
separately
+      nulls.reserveAdditional(total);
+    }
+
+    int n = total;
+    int initialValueOffset = state.valueOffset;
+    while (n > 0) {
+      if (currentCount == 0 && !readNextGroup()) break;
+      int num = Math.min(n, this.currentCount);
+      readValuesN(num, state, defLevels, values, nulls, valueReader, updater);
+      state.levelOffset += num;
+      currentCount -= num;
+      n -= num;
+    }
+
+    defLevels.addElementsAppended(total);
+
+    int valuesRead = state.valueOffset - initialValueOffset;
+    values.addElementsAppended(valuesRead);
+    if (!valuesReused) {
+      nulls.addElementsAppended(valuesRead);
+    }
+  }
 
-    state.advanceOffsetAndRowId(offset, rowId);
+  private void readValuesN(
+      int n,
+      ParquetReadState state,
+      WritableColumnVector defLevels,
+      WritableColumnVector values,
+      WritableColumnVector nulls,
+      VectorizedValuesReader valueReader,
+      ParquetVectorUpdater updater) {
+    switch (mode) {
+      case RLE:
+        if (currentValue == state.maxDefinitionLevel) {
+          updater.readValues(n, state.valueOffset, values, valueReader);
+        } else {
+          nulls.putNulls(state.valueOffset, n);
+        }
+        state.valueOffset += n;
+        defLevels.putInts(state.levelOffset, n, currentValue);
+        break;
+      case PACKED:
+        for (int i = 0; i < n; ++i) {
+          int currentValue = currentBuffer[currentBufferIdx++];
+          if (currentValue == state.maxDefinitionLevel) {
+            updater.readValue(state.valueOffset++, values, valueReader);
+          } else {
+            nulls.putNull(state.valueOffset++);
+          }
+          defLevels.putInt(state.levelOffset + i, currentValue);
+        }
+        break;
+    }
   }
 
   /**
@@ -264,11 +580,11 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
       VectorizedValuesReader valuesReader,
       ParquetVectorUpdater updater) {
     while (n > 0) {
-      if (this.currentCount == 0) this.readNextGroup();
+      if (currentCount == 0 && !readNextGroup()) break;
       int num = Math.min(n, this.currentCount);
       switch (mode) {
         case RLE:
-          // we only need to skip non-null values from `valuesReader` since 
nulls are represented
+          // We only need to skip non-null values from `valuesReader` since 
nulls are represented
           // via definition levels which are skipped here via decrementing 
`currentCount`.
           if (currentValue == state.maxDefinitionLevel) {
             updater.skipValues(num, valuesReader);
@@ -276,7 +592,7 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
           break;
         case PACKED:
           for (int i = 0; i < num; ++i) {
-            // same as above, only skip non-null values from `valuesReader`
+            // Same as above, only skip non-null values from `valuesReader`
             if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) 
{
               updater.skipValues(1, valuesReader);
             }
@@ -295,7 +611,7 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
   public void readIntegers(int total, WritableColumnVector c, int rowId) {
     int left = total;
     while (left > 0) {
-      if (this.currentCount == 0) this.readNextGroup();
+      if (currentCount == 0 && !readNextGroup()) break;
       int n = Math.min(left, this.currentCount);
       switch (mode) {
         case RLE:
@@ -505,9 +821,14 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
   }
 
   /**
-   * Reads the next group.
+   * Reads the next group. Returns false if no more group available.
    */
-  private void readNextGroup() {
+  private boolean readNextGroup() {
+    if (in.available() <= 0) {
+      currentCount = 0;
+      return false;
+    }
+
     try {
       int header = readUnsignedVarInt();
       this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
@@ -515,7 +836,7 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
         case RLE:
           this.currentCount = header >>> 1;
           this.currentValue = readIntLittleEndianPaddedOnBitWidth();
-          return;
+          break;
         case PACKED:
           int numGroups = header >>> 1;
           this.currentCount = numGroups * 8;
@@ -531,13 +852,15 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
             this.packer.unpack8Values(buffer, buffer.position(), 
this.currentBuffer, valueIndex);
             valueIndex += 8;
           }
-          return;
+          break;
         default:
           throw new ParquetDecodingException("not a valid mode " + this.mode);
       }
     } catch (IOException e) {
       throw new ParquetDecodingException("Failed to read from input stream", 
e);
     }
+
+    return true;
   }
 
   /**
@@ -546,7 +869,7 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
   private void skipValues(int n) {
     int left = n;
     while (left > 0) {
-      if (this.currentCount == 0) this.readNextGroup();
+      if (this.currentCount == 0 && !readNextGroup()) break;
       int num = Math.min(left, this.currentCount);
       switch (mode) {
         case RLE:
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index d246a3c..505377b 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -127,7 +127,7 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public boolean isNullAt(int rowId) {
-    return nulls[rowId] == 1;
+    return isAllNull || nulls[rowId] == 1;
   }
 
   //
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index ae457a1..9ffb733 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -53,7 +53,7 @@ public abstract class WritableColumnVector extends 
ColumnVector {
    * Resets this column for writing. The currently stored values are no longer 
accessible.
    */
   public void reset() {
-    if (isConstant) return;
+    if (isConstant || isAllNull) return;
 
     if (childColumns != null) {
       for (WritableColumnVector c: childColumns) {
@@ -83,6 +83,10 @@ public abstract class WritableColumnVector extends 
ColumnVector {
     dictionary = null;
   }
 
+  public void reserveAdditional(int additionalCapacity) {
+    reserve(elementsAppended + additionalCapacity);
+  }
+
   public void reserve(int requiredCapacity) {
     if (requiredCapacity < 0) {
       throwUnsupportedException(requiredCapacity, null);
@@ -117,7 +121,7 @@ public abstract class WritableColumnVector extends 
ColumnVector {
 
   @Override
   public boolean hasNull() {
-    return numNulls > 0;
+    return isAllNull || numNulls > 0;
   }
 
   @Override
@@ -714,16 +718,48 @@ public abstract class WritableColumnVector extends 
ColumnVector {
   public WritableColumnVector getChild(int ordinal) { return 
childColumns[ordinal]; }
 
   /**
-   * Returns the elements appended.
+   * Returns the number of child vectors.
+   */
+  public int getNumChildren() {
+    return childColumns.length;
+  }
+
+  /**
+   * Returns the elements appended. This is useful
    */
   public final int getElementsAppended() { return elementsAppended; }
 
   /**
+   * Increment number of elements appended by 'num'.
+   *
+   * This is useful when one wants to use the 'putXXX' API to add new elements 
to the vector, but
+   * still want to keep count of how many elements have been added (since the 
'putXXX' APIs don't
+   * increment count).
+   */
+  public final void addElementsAppended(int num) {
+    elementsAppended += num;
+  }
+
+  /**
    * Marks this column as being constant.
    */
   public final void setIsConstant() { isConstant = true; }
 
   /**
+   * Marks this column only contains null values.
+   */
+  public final void setAllNull() {
+    isAllNull = true;
+  }
+
+  /**
+   * Whether this column only contains null values.
+   */
+  public final boolean isAllNull() {
+    return isAllNull;
+  }
+
+  /**
    * Maximum number of rows that can be stored in this column.
    */
   protected int capacity;
@@ -746,6 +782,12 @@ public abstract class WritableColumnVector extends 
ColumnVector {
   protected boolean isConstant;
 
   /**
+   * True if this column only contains nulls. This means the column values 
never change, even
+   * across resets. Comparing to 'isConstant' above, this doesn't require any 
allocation of space.
+   */
+  protected boolean isAllNull;
+
+  /**
    * Default size of each array length value. This grows as necessary.
    */
   protected static final int DEFAULT_ARRAY_LENGTH = 4;
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 18876de..44dc145 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -45,6 +45,7 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
 import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.WholeStageCodegenExec
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, 
OnHeapColumnVector}
 import org.apache.spark.sql.internal.SQLConf
@@ -173,8 +174,8 @@ class ParquetFileFormat
   override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
     val conf = sparkSession.sessionState.conf
     conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled &&
-      schema.length <= conf.wholeStageMaxNumFields &&
-      schema.forall(_.dataType.isInstanceOf[AtomicType])
+      ParquetUtils.isBatchReadSupportedForSchema(conf, schema) &&
+        !WholeStageCodegenExec.isTooManyFields(conf, schema)
   }
 
   override def vectorTypes(
@@ -240,8 +241,7 @@ class ParquetFileFormat
     val sqlConf = sparkSession.sessionState.conf
     val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
     val enableVectorizedReader: Boolean =
-      sqlConf.parquetVectorizedReaderEnabled &&
-      resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+      ParquetUtils.isBatchReadSupportedForSchema(sqlConf, resultSchema)
     val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
     val timestampConversion: Boolean = 
sqlConf.isParquetINT96TimestampConversion
     val capacity = sqlConf.parquetVectorizedReaderBatchSize
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index 34a4eb8..0e065f1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import java.util.Locale
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.parquet.io.{ColumnIO, ColumnIOFactory, GroupColumnIO, 
PrimitiveColumnIO}
 import org.apache.parquet.schema._
@@ -92,10 +94,16 @@ class ParquetToSparkSchemaConverter(
   private def convertInternal(
       groupColumn: GroupColumnIO,
       sparkReadSchema: Option[StructType] = None): ParquetColumn = {
+    // First convert the read schema into a map from field name to the field 
itself, to avoid O(n)
+    // lookup cost below.
+    val schemaMapOpt = sparkReadSchema.map { schema =>
+      schema.map(f => normalizeFieldName(f.name) -> f).toMap
+    }
+
     val converted = (0 until groupColumn.getChildrenCount).map { i =>
       val field = groupColumn.getChild(i)
-      val fieldFromReadSchema = sparkReadSchema.flatMap { schema =>
-        schema.find(f => isSameFieldName(f.name, field.getName, caseSensitive))
+      val fieldFromReadSchema = schemaMapOpt.flatMap { schemaMap =>
+        schemaMap.get(normalizeFieldName(field.getName))
       }
       var fieldReadType = fieldFromReadSchema.map(_.dataType)
 
@@ -146,9 +154,8 @@ class ParquetToSparkSchemaConverter(
     ParquetColumn(StructType(converted.map(_._1)), groupColumn, 
converted.map(_._2))
   }
 
-  private def isSameFieldName(left: String, right: String, caseSensitive: 
Boolean): Boolean =
-    if (!caseSensitive) left.equalsIgnoreCase(right)
-    else left == right
+  private def normalizeFieldName(name: String): String =
+    if (caseSensitive) name else name.toLowerCase(Locale.ROOT)
 
   /**
    * Converts a Parquet [[Type]] to a [[ParquetColumn]] which wraps a Spark 
SQL [[DataType]] with
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
index 2c565c8..9f2e658 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
@@ -36,8 +36,9 @@ import 
org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
 import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, 
Count, CountStar, Max, Min}
 import org.apache.spark.sql.execution.datasources.AggregatePushDownUtils
 import org.apache.spark.sql.execution.datasources.v2.V2ColumnUtils
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.{LegacyBehaviorPolicy, 
PARQUET_AGGREGATE_PUSHDOWN_ENABLED}
-import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, 
StructType}
+import org.apache.spark.sql.types.{ArrayType, AtomicType, DataType, MapType, 
StructField, StructType}
 
 object ParquetUtils {
   def inferSchema(
@@ -188,6 +189,30 @@ object ParquetUtils {
   }
 
   /**
+   * Whether columnar read is supported for the input `schema`.
+   */
+  def isBatchReadSupportedForSchema(sqlConf: SQLConf, schema: StructType): 
Boolean =
+    sqlConf.parquetVectorizedReaderEnabled &&
+      schema.forall(f => isBatchReadSupported(sqlConf, f.dataType))
+
+  def isBatchReadSupported(sqlConf: SQLConf, dt: DataType): Boolean = dt match 
{
+    case _: AtomicType =>
+      true
+    case at: ArrayType =>
+      sqlConf.parquetVectorizedReaderNestedColumnEnabled &&
+        isBatchReadSupported(sqlConf, at.elementType)
+    case mt: MapType =>
+      sqlConf.parquetVectorizedReaderNestedColumnEnabled &&
+        isBatchReadSupported(sqlConf, mt.keyType) &&
+        isBatchReadSupported(sqlConf, mt.valueType)
+    case st: StructType =>
+      sqlConf.parquetVectorizedReaderNestedColumnEnabled &&
+        st.fields.forall(f => isBatchReadSupported(sqlConf, f.dataType))
+    case _ =>
+      false
+  }
+
+  /**
    * When the partial aggregates (Max/Min/Count) are pushed down to Parquet, 
we don't need to
    * createRowBaseReader to read data from Parquet and aggregate at Spark 
layer. Instead we want
    * to get the partial aggregates (Max/Min/Count) result using the statistics 
information
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index 12b8a63..ea4f5e0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -70,8 +70,8 @@ case class ParquetPartitionReaderFactory(
   private val isCaseSensitive = sqlConf.caseSensitiveAnalysis
   private val resultSchema = StructType(partitionSchema.fields ++ 
readDataSchema.fields)
   private val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
-  private val enableVectorizedReader: Boolean = 
sqlConf.parquetVectorizedReaderEnabled &&
-    resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+  private val enableVectorizedReader: Boolean =
+    ParquetUtils.isBatchReadSupportedForSchema(sqlConf, resultSchema)
   private val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
   private val timestampConversion: Boolean = 
sqlConf.isParquetINT96TimestampConversion
   private val capacity = sqlConf.parquetVectorizedReaderBatchSize
diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out 
b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
index f5e5b46..f98fb1e 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
@@ -1125,7 +1125,8 @@ struct<plan:string>
 -- !query output
 == Physical Plan ==
 *Filter v#x IN ([a],null)
-+- FileScan parquet default.t[v#x] Batched: false, DataFilters: [v#x IN 
([a],null)], Format: Parquet, Location [not included in 
comparison]/{warehouse_dir}/t], PartitionFilters: [], PushedFilters: [In(v, 
[[a],null])], ReadSchema: struct<v:array<string>>
++- *ColumnarToRow
+   +- FileScan parquet default.t[v#x] Batched: true, DataFilters: [v#x IN 
([a],null)], Format: Parquet, Location [not included in 
comparison]/{warehouse_dir}/t], PartitionFilters: [], PushedFilters: [In(v, 
[[a],null])], ReadSchema: struct<v:array<string>>
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out 
b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
index 4e552d5..a563eda 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
@@ -1067,7 +1067,8 @@ struct<plan:string>
 -- !query output
 == Physical Plan ==
 *Filter v#x IN ([a],null)
-+- FileScan parquet default.t[v#x] Batched: false, DataFilters: [v#x IN 
([a],null)], Format: Parquet, Location [not included in 
comparison]/{warehouse_dir}/t], PartitionFilters: [], PushedFilters: [In(v, 
[[a],null])], ReadSchema: struct<v:array<string>>
++- *ColumnarToRow
+   +- FileScan parquet default.t[v#x] Batched: true, DataFilters: [v#x IN 
([a],null)], Format: Parquet, Location [not included in 
comparison]/{warehouse_dir}/t], PartitionFilters: [], PushedFilters: [In(v, 
[[a],null])], ReadSchema: struct<v:array<string>>
 
 
 -- !query
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileBasedDataSourceTest.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileBasedDataSourceTest.scala
index c2dc20b..a154a65 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileBasedDataSourceTest.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileBasedDataSourceTest.scala
@@ -38,6 +38,8 @@ private[sql] trait FileBasedDataSourceTest extends 
SQLTestUtils {
   protected val dataSourceName: String
   // The SQL config key for enabling vectorized reader.
   protected val vectorizedReaderEnabledKey: String
+  // The SQL config key for enabling vectorized reader for nested types.
+  protected val vectorizedReaderNestedEnabledKey: String
 
   /**
    * Reads data source file from given `path` as `DataFrame` and passes it to 
given function.
@@ -52,8 +54,11 @@ private[sql] trait FileBasedDataSourceTest extends 
SQLTestUtils {
       f(spark.read.format(dataSourceName).load(path.toString))
     }
     if (testVectorized) {
-      withSQLConf(vectorizedReaderEnabledKey -> "true") {
-        f(spark.read.format(dataSourceName).load(path.toString))
+      Seq(true, false).foreach { enableNested =>
+        withSQLConf(vectorizedReaderEnabledKey -> "true",
+            vectorizedReaderNestedEnabledKey -> enableNested.toString) {
+          f(spark.read.format(dataSourceName).load(path))
+        }
       }
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
index c36bfd9..46a7f8d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
@@ -56,6 +56,8 @@ trait OrcTest extends QueryTest with FileBasedDataSourceTest 
with BeforeAndAfter
   override protected val dataSourceName: String = "orc"
   override protected val vectorizedReaderEnabledKey: String =
     SQLConf.ORC_VECTORIZED_READER_ENABLED.key
+  override protected val vectorizedReaderNestedEnabledKey: String =
+    SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key
 
   protected override def beforeAll(): Unit = {
     super.beforeAll()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1SchemaPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1SchemaPruningSuite.scala
index 2ce38da..4d33eac 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1SchemaPruningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1SchemaPruningSuite.scala
@@ -25,6 +25,8 @@ class OrcV1SchemaPruningSuite extends SchemaPruningSuite {
   override protected val dataSourceName: String = "orc"
   override protected val vectorizedReaderEnabledKey: String =
     SQLConf.ORC_VECTORIZED_READER_ENABLED.key
+  override protected val vectorizedReaderNestedEnabledKey: String =
+    SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key
 
   override protected def sparkConf: SparkConf =
     super
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala
index 47254f4..107a2b7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala
@@ -29,6 +29,8 @@ class OrcV2SchemaPruningSuite extends SchemaPruningSuite with 
AdaptiveSparkPlanH
   override protected val dataSourceName: String = "orc"
   override protected val vectorizedReaderEnabledKey: String =
     SQLConf.ORC_VECTORIZED_READER_ENABLED.key
+  override protected val vectorizedReaderNestedEnabledKey: String =
+    SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key
 
   override protected def sparkConf: SparkConf =
     super
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala
index bdcc1a4..64bfcda 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala
@@ -102,4 +102,17 @@ class ParquetColumnIndexSuite extends QueryTest with 
ParquetTest with SharedSpar
     }.toDF()
     checkUnalignedPages(df)(actions: _*)
   }
+
+  test("reading unaligned pages - struct type") {
+    val df = (0 until 2000).map(i => Tuple1((i.toLong, i + ":" + "o" * (i / 
100)))).toDF("s")
+    checkUnalignedPages(df)(
+      df => df.filter("s._1 = 500"),
+      df => df.filter("s._1 = 500 or s._1 = 1500"),
+      df => df.filter("s._1 = 500 or s._1 = 501 or s._1 = 1500"),
+      df => df.filter("s._1 = 500 or s._1 = 501 or s._1 = 1000 or s._1 = 
1500"),
+      // range filter
+      df => df.filter("s._1 >= 500 and s._1 < 1000"),
+      df => df.filter("(s._1 >= 500 and s._1 < 1000) or (s._1 >= 1500 and s._1 
< 1600)")
+    )
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala
index 4eab5c3..a9a8dac 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala
@@ -89,6 +89,43 @@ abstract class ParquetFileFormatSuite
       }
     }
   }
+
+  test("support batch reads for schema") {
+    val testUDT = new TestUDT.MyDenseVectorUDT
+    Seq(true, false).foreach { enabled =>
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key 
-> enabled.toString) {
+        Seq(
+          Seq(StructField("f1", IntegerType), StructField("f2", BooleanType)) 
-> true,
+          Seq(StructField("f1", IntegerType), StructField("f2", 
ArrayType(IntegerType))) -> enabled,
+          Seq(StructField("f1", BooleanType), StructField("f2", testUDT)) -> 
false
+        ).foreach { case (schema, expected) =>
+          assert(ParquetUtils.isBatchReadSupportedForSchema(conf, 
StructType(schema)) == expected)
+        }
+      }
+    }
+  }
+
+  test("support batch reads for data type") {
+    val testUDT = new TestUDT.MyDenseVectorUDT
+    Seq(true, false).foreach { enabled =>
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key 
-> enabled.toString) {
+        Seq(
+          IntegerType -> true,
+          BooleanType -> true,
+          ArrayType(TimestampType) -> enabled,
+          StructType(Seq(StructField("f1", DecimalType.SYSTEM_DEFAULT),
+            StructField("f2", StringType))) -> enabled,
+          MapType(keyType = LongType, valueType = DateType) -> enabled,
+          testUDT -> false,
+          ArrayType(testUDT) -> false,
+          StructType(Seq(StructField("f1", ByteType), StructField("f2", 
testUDT))) -> false,
+          MapType(keyType = testUDT, valueType = BinaryType) -> false
+        ).foreach { case (dt, expected) =>
+          assert(ParquetUtils.isBatchReadSupported(conf, dt) == expected)
+        }
+      }
+    }
+  }
 }
 
 class ParquetFileFormatV1Suite extends ParquetFileFormatSuite {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 99b2d98..4d01db9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -358,6 +358,357 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
     }
   }
 
+  test("vectorized reader: array") {
+    val data = Seq(
+      Tuple1(null),
+      Tuple1(Seq()),
+      Tuple1(Seq("a", "b", "c")),
+      Tuple1(Seq(null))
+    )
+
+    withParquetFile(data) { file =>
+      readParquetFile(file) { df =>
+        checkAnswer(df.sort("_1"),
+          Row(null) :: Row(Seq()) :: Row(Seq(null)) :: Row(Seq("a", "b", "c")) 
:: Nil
+        )
+      }
+    }
+  }
+
+  test("vectorized reader: missing array") {
+    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> 
"true") {
+      val data = Seq(
+        Tuple1(null),
+        Tuple1(Seq()),
+        Tuple1(Seq("a", "b", "c")),
+        Tuple1(Seq(null))
+      )
+
+      val readSchema = new StructType().add("_2", new ArrayType(
+        new StructType().add("a", LongType, nullable = true),
+        containsNull = true)
+      )
+
+      withParquetFile(data) { file =>
+        checkAnswer(spark.read.schema(readSchema).parquet(file),
+          Row(null) :: Row(null) :: Row(null) :: Row(null) :: Nil
+        )
+      }
+    }
+  }
+
+  test("vectorized reader: array of array") {
+    val data = Seq(
+      Tuple1(Seq(Seq(0, 1), Seq(2, 3))),
+      Tuple1(Seq(Seq(4, 5), Seq(6, 7)))
+    )
+
+    withParquetFile(data) { file =>
+      readParquetFile(file) { df =>
+        checkAnswer(df.sort("_1"),
+          Row(Seq(Seq(0, 1), Seq(2, 3))) :: Row(Seq(Seq(4, 5), Seq(6, 7))) :: 
Nil
+        )
+      }
+    }
+  }
+
+  test("vectorized reader: struct of array") {
+    val data = Seq(
+      Tuple1(Tuple2("a", null)),
+      Tuple1(null),
+      Tuple1(Tuple2(null, null)),
+      Tuple1(Tuple2(null, Seq("b", "c"))),
+      Tuple1(Tuple2("d", Seq("e", "f"))),
+      Tuple1(null)
+    )
+
+    withParquetFile(data) { file =>
+      readParquetFile(file) { df =>
+        checkAnswer(df,
+          Row(Row("a", null)) :: Row(null) :: Row(Row(null, null)) ::
+              Row(Row(null, Seq("b", "c"))) :: Row(Row("d", Seq("e", "f"))) :: 
Row(null) :: Nil
+        )
+      }
+    }
+  }
+
+  test("vectorized reader: array of struct") {
+    val data = Seq(
+      Tuple1(null),
+      Tuple1(Seq()),
+      Tuple1(Seq(Tuple2("a", null), Tuple2(null, "b"))),
+      Tuple1(Seq(null)),
+      Tuple1(Seq(Tuple2(null, null), Tuple2("c", null), null)),
+      Tuple1(Seq())
+    )
+
+    withParquetFile(data) { file =>
+      readParquetFile(file) { df =>
+        checkAnswer(df,
+          Row(null) ::
+              Row(Seq()) ::
+              Row(Seq(Row("a", null), Row(null, "b"))) ::
+              Row(Seq(null)) ::
+              Row(Seq(Row(null, null), Row("c", null), null)) ::
+              Row(Seq()) ::
+              Nil)
+      }
+    }
+  }
+
+
+  test("vectorized reader: array of nested struct") {
+    val data = Seq(
+      Tuple1(Tuple2("a", null)),
+      Tuple1(Tuple2("b", Seq(Tuple2("c", "d")))),
+      Tuple1(null),
+      Tuple1(Tuple2("e", Seq(Tuple2("f", null), Tuple2(null, "g")))),
+      Tuple1(Tuple2(null, null)),
+      Tuple1(Tuple2(null, Seq(null))),
+      Tuple1(Tuple2(null, Seq(Tuple2(null, null), Tuple2("h", null), null))),
+      Tuple1(Tuple2("i", Seq())),
+      Tuple1(null)
+    )
+
+    withParquetFile(data) { file =>
+      readParquetFile(file) { df =>
+        checkAnswer(df,
+          Row(Row("a", null)) ::
+              Row(Row("b", Seq(Row("c", "d")))) ::
+              Row(null) ::
+              Row(Row("e", Seq(Row("f", null), Row(null, "g")))) ::
+              Row(Row(null, null)) ::
+              Row(Row(null, Seq(null))) ::
+              Row(Row(null, Seq(Row(null, null), Row("h", null), null))) ::
+              Row(Row("i", Seq())) ::
+              Row(null) ::
+              Nil)
+      }
+    }
+  }
+
+  test("vectorized reader: required array with required elements") {
+    Seq(true, false).foreach { dictionaryEnabled =>
+      def makeRawParquetFile(path: Path, expected: Seq[Seq[String]]): Unit = {
+        val schemaStr =
+          """message spark_schema {
+            |  required group _1 (LIST) {
+            |    repeated group list {
+            |      required binary element (UTF8);
+            |    }
+            |  }
+            |}
+             """.stripMargin
+        val schema = MessageTypeParser.parseMessageType(schemaStr)
+        val writer = createParquetWriter(schema, path, dictionaryEnabled)
+
+        val factory = new SimpleGroupFactory(schema)
+        expected.foreach { values =>
+          val group = factory.newGroup()
+          val list = group.addGroup(0)
+          values.foreach { value =>
+            list.addGroup(0).append("element", value)
+          }
+          writer.write(group)
+        }
+        writer.close()
+      }
+
+      // write the following into the Parquet file:
+      //   0: [ "a", "b" ]
+      //   1: [ ]
+      //   2: [ "c", "d" ]
+      withTempDir { dir =>
+        val path = new Path(dir.toURI.toString, "part-r-0.parquet")
+        val expected = Seq(Seq("a", "b"), Seq(), Seq("c", "d"))
+        makeRawParquetFile(path, expected)
+        readParquetFile(path.toString) { df => checkAnswer(df, 
expected.map(Row(_))) }
+      }
+    }
+  }
+
+  test("vectorized reader: optional array with required elements") {
+    Seq(true, false).foreach { dictionaryEnabled =>
+      def makeRawParquetFile(path: Path, expected: Seq[Seq[String]]): Unit = {
+        val schemaStr =
+          """message spark_schema {
+            |  optional group _1 (LIST) {
+            |    repeated group list {
+            |      required binary element (UTF8);
+            |    }
+            |  }
+            |}
+             """.stripMargin
+        val schema = MessageTypeParser.parseMessageType(schemaStr)
+        val writer = createParquetWriter(schema, path, dictionaryEnabled)
+
+        val factory = new SimpleGroupFactory(schema)
+        expected.foreach { values =>
+          val group = factory.newGroup()
+          if (values != null) {
+            val list = group.addGroup(0)
+            values.foreach { value =>
+              list.addGroup(0).append("element", value)
+            }
+          }
+          writer.write(group)
+        }
+        writer.close()
+      }
+
+      // write the following into the Parquet file:
+      //   0: [ "a", "b" ]
+      //   1: null
+      //   2: [ "c", "d" ]
+      //   3: [ ]
+      //   4: [ "e", "f" ]
+      withTempDir { dir =>
+        val path = new Path(dir.toURI.toString, "part-r-0.parquet")
+        val expected = Seq(Seq("a", "b"), null, Seq("c", "d"), Seq(), Seq("e", 
"f"))
+        makeRawParquetFile(path, expected)
+        readParquetFile(path.toString) { df => checkAnswer(df, 
expected.map(Row(_))) }
+      }
+    }
+  }
+
+  test("vectorized reader: required array with optional elements") {
+    Seq(true, false).foreach { dictionaryEnabled =>
+      def makeRawParquetFile(path: Path, expected: Seq[Seq[String]]): Unit = {
+        val schemaStr =
+          """message spark_schema {
+            |  required group _1 (LIST) {
+            |    repeated group list {
+            |      optional binary element (UTF8);
+            |    }
+            |  }
+            |}
+             """.stripMargin
+        val schema = MessageTypeParser.parseMessageType(schemaStr)
+        val writer = createParquetWriter(schema, path, dictionaryEnabled)
+
+        val factory = new SimpleGroupFactory(schema)
+        expected.foreach { values =>
+          val group = factory.newGroup()
+          if (values != null) {
+            val list = group.addGroup(0)
+            values.foreach { value =>
+              val group = list.addGroup(0)
+              if (value != null) group.append("element", value)
+            }
+          }
+          writer.write(group)
+        }
+        writer.close()
+      }
+
+      // write the following into the Parquet file:
+      //   0: [ "a", null ]
+      //   3: [ ]
+      //   4: [ null, "b" ]
+      withTempDir { dir =>
+        val path = new Path(dir.toURI.toString, "part-r-0.parquet")
+        val expected = Seq(Seq("a", null), Seq(), Seq(null, "b"))
+        makeRawParquetFile(path, expected)
+        readParquetFile(path.toString) { df => checkAnswer(df, 
expected.map(Row(_))) }
+      }
+    }
+  }
+
+  test("vectorized reader: required array with legacy format") {
+    Seq(true, false).foreach { dictionaryEnabled =>
+      def makeRawParquetFile(path: Path, expected: Seq[Seq[String]]): Unit = {
+        val schemaStr =
+          """message spark_schema {
+            |  repeated binary element (UTF8);
+            |}
+             """.stripMargin
+        val schema = MessageTypeParser.parseMessageType(schemaStr)
+        val writer = createParquetWriter(schema, path, dictionaryEnabled)
+
+        val factory = new SimpleGroupFactory(schema)
+        expected.foreach { values =>
+          val group = factory.newGroup()
+          values.foreach(group.append("element", _))
+          writer.write(group)
+        }
+        writer.close()
+      }
+
+      // write the following into the Parquet file:
+      //   0: [ "a", "b" ]
+      //   3: [ ]
+      //   4: [ "c", "d" ]
+      withTempDir { dir =>
+        val path = new Path(dir.toURI.toString, "part-r-0.parquet")
+        val expected = Seq(Seq("a", "b"), Seq(), Seq("c", "d"))
+        makeRawParquetFile(path, expected)
+        readParquetFile(path.toString) { df => checkAnswer(df, 
expected.map(Row(_))) }
+      }
+    }
+  }
+
+  test("vectorized reader: struct") {
+    val data = Seq(
+      Tuple1(null),
+      Tuple1((1, "a")),
+      Tuple1((2, null)),
+      Tuple1((3, "b")),
+      Tuple1(null)
+    )
+
+    withParquetFile(data) { file =>
+      readParquetFile(file) { df =>
+        checkAnswer(df.sort("_1"),
+          Row(null) :: Row(null) :: Row(Row(1, "a")) :: Row(Row(2, null)) :: 
Row(Row(3, "b")) :: Nil
+        )
+      }
+    }
+  }
+
+  test("vectorized reader: missing all struct fields") {
+    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> 
"true") {
+      val data = Seq(
+        Tuple1((1, "a")),
+        Tuple1((2, null)),
+        Tuple1(null)
+      )
+
+      val readSchema = new StructType().add("_1",
+        new StructType()
+            .add("_3", IntegerType, nullable = true)
+            .add("_4", LongType, nullable = true),
+        nullable = true)
+
+      withParquetFile(data) { file =>
+        checkAnswer(spark.read.schema(readSchema).parquet(file),
+          Row(null) :: Row(null) :: Row(null) :: Nil
+        )
+      }
+    }
+  }
+
+  test("vectorized reader: missing some struct fields") {
+    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> 
"true") {
+      val data = Seq(
+        Tuple1((1, "a")),
+        Tuple1((2, null)),
+        Tuple1(null)
+      )
+
+      val readSchema = new StructType().add("_1",
+        new StructType()
+            .add("_1", IntegerType, nullable = true)
+            .add("_3", LongType, nullable = true),
+        nullable = true)
+
+      withParquetFile(data) { file =>
+        checkAnswer(spark.read.schema(readSchema).parquet(file),
+          Row(null) :: Row(Row(1, null)) :: Row(Row(2, null)) :: Nil
+        )
+      }
+    }
+  }
+
   test("SPARK-34817: Support for unsigned Parquet logical types") {
     val parquetSchema = MessageTypeParser.parseMessageType(
       """message root {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
index cab93bd..6a93b72 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
@@ -31,6 +31,8 @@ abstract class ParquetSchemaPruningSuite extends 
SchemaPruningSuite with Adaptiv
   override protected val dataSourceName: String = "parquet"
   override protected val vectorizedReaderEnabledKey: String =
     SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key
+  override protected val vectorizedReaderNestedEnabledKey: String =
+    SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key
 
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index 1869084..9eca308 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -51,6 +51,8 @@ private[sql] trait ParquetTest extends 
FileBasedDataSourceTest {
   override protected val dataSourceName: String = "parquet"
   override protected val vectorizedReaderEnabledKey: String =
     SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key
+  override protected val vectorizedReaderNestedEnabledKey: String =
+    SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key
 
   /**
    * Reads the parquet file at `path`
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorizedSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorizedSuite.scala
index 36a52b6..9450918 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorizedSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorizedSuite.scala
@@ -195,6 +195,290 @@ class ParquetVectorizedSuite extends QueryTest with 
ParquetTest with SharedSpark
     }
   }
 
+  test("nested type - single page, no column index") {
+    (1 to 4).foreach { batchSize =>
+      Seq(true, false).foreach { dictionaryEnabled =>
+        testNestedStringArrayOneLevel(None, None, Seq(4),
+          Seq(Seq("a", "b", "c", "d")),
+          Seq(0, 1, 1, 1), Seq(3, 3, 3, 3), Seq("a", "b", "c", "d"), batchSize,
+          dictionaryEnabled = dictionaryEnabled)
+
+        testNestedStringArrayOneLevel(None, None, Seq(4),
+          Seq(Seq("a", "b"), Seq("c", "d")),
+          Seq(0, 1, 0, 1), Seq(3, 3, 3, 3), Seq("a", "b", "c", "d"), batchSize,
+          dictionaryEnabled = dictionaryEnabled)
+
+        testNestedStringArrayOneLevel(None, None, Seq(4),
+          Seq(Seq("a"), Seq("b"), Seq("c"), Seq("d")),
+          Seq(0, 0, 0, 0), Seq(3, 3, 3, 3), Seq("a", "b", "c", "d"), batchSize,
+          dictionaryEnabled = dictionaryEnabled)
+
+        testNestedStringArrayOneLevel(None, None, Seq(4),
+          Seq(Seq("a"), Seq(null), Seq("c"), Seq(null)),
+          Seq(0, 0, 0, 0), Seq(3, 2, 3, 2), Seq("a", null, "c", null), 
batchSize,
+          dictionaryEnabled = dictionaryEnabled)
+
+        testNestedStringArrayOneLevel(None, None, Seq(4),
+          Seq(Seq("a"), Seq(null, null, null)),
+          Seq(0, 0, 1, 1), Seq(3, 2, 2, 2), Seq("a", null, null, null), 
batchSize,
+          dictionaryEnabled = dictionaryEnabled)
+
+        testNestedStringArrayOneLevel(None, None, Seq(6),
+          Seq(Seq("a"), Seq(null, null, null), null, Seq()),
+          Seq(0, 0, 1, 1, 0, 0), Seq(3, 2, 2, 2, 0, 1), Seq("a", null, null, 
null, null, null),
+          batchSize, dictionaryEnabled = dictionaryEnabled)
+
+        testNestedStringArrayOneLevel(None, None, Seq(8),
+          Seq(Seq("a"), Seq(), Seq(), null, Seq("b", null, "c"), null),
+          Seq(0, 0, 0, 0, 0, 1, 1, 0), Seq(3, 1, 1, 0, 3, 2, 3, 0),
+          Seq("a", null, null, null, "b", null, "c", null), batchSize,
+          dictionaryEnabled = dictionaryEnabled)
+      }
+    }
+  }
+
+  test("nested type - multiple page, no column index") {
+    BATCH_SIZE_CONFIGS.foreach { batchSize =>
+      Seq(Seq(2, 3, 2, 3)).foreach { pageSizes =>
+        Seq(true, false).foreach { dictionaryEnabled =>
+          testNestedStringArrayOneLevel(None, None, pageSizes,
+            Seq(Seq("a"), Seq(), Seq("b", null, "c"), Seq("d", "e"), 
Seq(null), Seq(), null),
+            Seq(0, 0, 0, 1, 1, 0, 1, 0, 0, 0), Seq(3, 1, 3, 2, 3, 3, 3, 2, 1, 
0),
+            Seq("a", null, "b", null, "c", "d", "e", null, null, null), 
batchSize,
+            dictionaryEnabled = dictionaryEnabled)
+        }
+      }
+    }
+  }
+
+  test("nested type - multiple page, no column index, batch span multiple 
pages") {
+    (1 to 6).foreach { batchSize =>
+      Seq(true, false).foreach { dictionaryEnabled =>
+        // a list across multiple pages
+        testNestedStringArrayOneLevel(None, None, Seq(1, 5),
+          Seq(Seq("a"), Seq("b", "c", "d", "e", "f")),
+          Seq(0, 0, 1, 1, 1, 1), Seq.fill(6)(3), Seq("a", "b", "c", "d", "e", 
"f"), batchSize,
+          dictionaryEnabled = dictionaryEnabled)
+
+        testNestedStringArrayOneLevel(None, None, Seq(1, 3, 2),
+          Seq(Seq("a"), Seq("b", "c", "d"), Seq("e", "f")),
+          Seq(0, 0, 1, 1, 0, 1), Seq.fill(6)(3), Seq("a", "b", "c", "d", "e", 
"f"), batchSize,
+          dictionaryEnabled = dictionaryEnabled)
+
+        testNestedStringArrayOneLevel(None, None, Seq(2, 2, 2),
+          Seq(Seq("a", "b"), Seq("c", "d"), Seq("e", "f")),
+          Seq(0, 1, 0, 1, 0, 1), Seq.fill(6)(3), Seq("a", "b", "c", "d", "e", 
"f"), batchSize,
+          dictionaryEnabled = dictionaryEnabled)
+      }
+    }
+  }
+
+  test("nested type - RLE encoding") {
+    (1 to 8).foreach { batchSize =>
+      Seq(Seq(26), Seq(4, 3, 11, 4, 4), Seq(18, 8)).foreach { pageSizes =>
+        Seq(true, false).foreach { dictionaryEnabled =>
+          testNestedStringArrayOneLevel(None, None, pageSizes,
+            (0 to 6).map(i => Seq(('a' + i).toChar.toString)) ++
+                Seq((7 to 17).map(i => ('a' + i).toChar.toString)) ++
+                (18 to 25).map(i => Seq(('a' + i).toChar.toString)),
+            Seq.fill(8)(0) ++ Seq.fill(10)(1) ++ Seq.fill(8)(0), 
Seq.fill(26)(3),
+            batchSize = batchSize, dictionaryEnabled = dictionaryEnabled)
+        }
+      }
+    }
+  }
+
+  test("nested type - column index with ranges") {
+    (1 to 8).foreach { batchSize =>
+      Seq(Seq(8), Seq(6, 2), Seq(1, 5, 2)).foreach { pageSizes =>
+        Seq(true, false).foreach { dictionaryEnabled =>
+          var ranges = Seq((1L, 2L))
+          testNestedStringArrayOneLevel(None, Some(ranges), pageSizes,
+            Seq(Seq("b", "c", "d", "e", "f"), Seq("g", "h")),
+            Seq(0, 0, 1, 1, 1, 1, 0, 1), Seq.fill(8)(3),
+            Seq("a", "b", "c", "d", "e", "f", "g", "h"),
+            batchSize, dictionaryEnabled = dictionaryEnabled)
+
+          ranges = Seq((3L, 5L))
+          testNestedStringArrayOneLevel(None, Some(ranges), pageSizes,
+            Seq(),
+            Seq(0, 0, 1, 1, 1, 1, 0, 1), Seq.fill(8)(3),
+            Seq("a", "b", "c", "d", "e", "f", "g", "h"),
+            batchSize, dictionaryEnabled = dictionaryEnabled)
+
+          ranges = Seq((0L, 0L))
+          testNestedStringArrayOneLevel(None, Some(ranges), pageSizes,
+            Seq(Seq("a")),
+            Seq(0, 0, 1, 1, 1, 1, 0, 1), Seq.fill(8)(3),
+            Seq("a", "b", "c", "d", "e", "f", "g", "h"),
+            batchSize, dictionaryEnabled = dictionaryEnabled)
+        }
+      }
+    }
+  }
+
+  test("nested type - column index with ranges and RLE encoding") {
+    BATCH_SIZE_CONFIGS.foreach { batchSize =>
+      Seq(Seq(26), Seq(4, 3, 11, 4, 4), Seq(18, 8)).foreach { pageSizes =>
+        Seq(true, false).foreach { dictionaryEnabled =>
+          var ranges = Seq((0L, 2L))
+          testNestedStringArrayOneLevel(None, Some(ranges), pageSizes,
+            Seq(Seq("a"), Seq("b"), Seq("c")),
+            Seq.fill(8)(0) ++ Seq.fill(10)(1) ++ Seq.fill(8)(0), 
Seq.fill(26)(3),
+            batchSize = batchSize, dictionaryEnabled = dictionaryEnabled)
+
+          ranges = Seq((4L, 6L))
+          testNestedStringArrayOneLevel(None, Some(ranges), pageSizes,
+            Seq(Seq("e"), Seq("f"), Seq("g")),
+            Seq.fill(8)(0) ++ Seq.fill(10)(1) ++ Seq.fill(8)(0), 
Seq.fill(26)(3),
+            batchSize = batchSize, dictionaryEnabled = dictionaryEnabled)
+
+          ranges = Seq((6L, 9L))
+          testNestedStringArrayOneLevel(None, Some(ranges), pageSizes,
+            Seq(Seq("g")) ++ Seq((7 to 17).map(i => ('a' + 
i).toChar.toString)) ++
+                Seq(Seq("s"), Seq("t")),
+            Seq.fill(8)(0) ++ Seq.fill(10)(1) ++ Seq.fill(8)(0), 
Seq.fill(26)(3),
+            batchSize = batchSize, dictionaryEnabled = dictionaryEnabled)
+
+          ranges = Seq((4L, 6L), (14L, 20L))
+          testNestedStringArrayOneLevel(None, Some(ranges), pageSizes,
+            Seq(Seq("e"), Seq("f"), Seq("g"), Seq("y"), Seq("z")),
+            Seq.fill(8)(0) ++ Seq.fill(10)(1) ++ Seq.fill(8)(0), 
Seq.fill(26)(3),
+            batchSize = batchSize, dictionaryEnabled = dictionaryEnabled)
+        }
+      }
+    }
+  }
+
+  test("nested type - column index with ranges and nulls") {
+    BATCH_SIZE_CONFIGS.foreach { batchSize =>
+      Seq(Seq(16), Seq(8, 8), Seq(4, 4, 4, 4), Seq(2, 6, 4, 4)).foreach { 
pageSizes =>
+        Seq(true, false).foreach { dictionaryEnabled =>
+          testNestedStringArrayOneLevel(None, None, pageSizes,
+            Seq(Seq("a", null), Seq("c", "d"), Seq(), Seq("f", null, "h"),
+              Seq("i", "j", "k", null), Seq(), null, null, Seq()),
+            Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0),
+            Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1),
+            (0 to 15),
+            batchSize = batchSize, dictionaryEnabled = dictionaryEnabled)
+
+          var ranges = Seq((0L, 15L))
+          testNestedStringArrayOneLevel(None, Some(ranges), pageSizes,
+            Seq(Seq("a", null), Seq("c", "d"), Seq(), Seq("f", null, "h"),
+              Seq("i", "j", "k", null), Seq(), null, null, Seq()),
+            Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0),
+            Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1),
+            (0 to 15),
+            batchSize = batchSize, dictionaryEnabled = dictionaryEnabled)
+
+          ranges = Seq((0L, 2L))
+          testNestedStringArrayOneLevel(None, Some(ranges), pageSizes,
+            Seq(Seq("a", null), Seq("c", "d"), Seq()),
+            Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0),
+            Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1),
+            (0 to 15),
+            batchSize = batchSize, dictionaryEnabled = dictionaryEnabled)
+
+          ranges = Seq((3L, 7L))
+          testNestedStringArrayOneLevel(None, Some(ranges), pageSizes,
+            Seq(Seq("f", null, "h"), Seq("i", "j", "k", null), Seq(), null, 
null),
+            Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0),
+            Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1),
+            (0 to 15),
+            batchSize = batchSize, dictionaryEnabled = dictionaryEnabled)
+
+          ranges = Seq((5, 12L))
+          testNestedStringArrayOneLevel(None, Some(ranges), pageSizes,
+            Seq(Seq(), null, null, Seq()),
+            Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0),
+            Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1),
+            (0 to 15),
+            batchSize = batchSize, dictionaryEnabled = dictionaryEnabled)
+
+          ranges = Seq((5, 12L))
+          testNestedStringArrayOneLevel(None, Some(ranges), pageSizes,
+            Seq(Seq(), null, null, Seq()),
+            Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0),
+            Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1),
+            (0 to 15),
+            batchSize = batchSize, dictionaryEnabled = dictionaryEnabled)
+
+          ranges = Seq((0L, 0L), (2, 3), (5, 7), (8, 10))
+          testNestedStringArrayOneLevel(None, Some(ranges), pageSizes,
+            Seq(Seq("a", null), Seq(), Seq("f", null, "h"), Seq(), null, null, 
Seq()),
+            Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0),
+            Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1),
+            (0 to 15),
+            batchSize = batchSize, dictionaryEnabled = dictionaryEnabled)
+        }
+      }
+    }
+  }
+
+  test("nested type - column index with ranges, nulls and first row indexes") {
+    BATCH_SIZE_CONFIGS.foreach { batchSize =>
+      Seq(true, false).foreach { dictionaryEnabled =>
+        val pageSizes = Seq(4, 4, 4, 4)
+        var firstRowIndexes = Seq(10L, 20, 30, 40)
+        var ranges = Seq((0L, 5L))
+        testNestedStringArrayOneLevel(Some(firstRowIndexes), Some(ranges), 
pageSizes,
+          Seq(),
+          Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0),
+          Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1),
+          (0 to 15),
+          batchSize = batchSize, dictionaryEnabled = dictionaryEnabled)
+
+        ranges = Seq((5L, 15))
+        testNestedStringArrayOneLevel(Some(firstRowIndexes), Some(ranges), 
pageSizes,
+          Seq(Seq("a", null), Seq("c", "d")),
+          Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0),
+          Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1),
+          (0 to 15),
+          batchSize = batchSize, dictionaryEnabled = dictionaryEnabled)
+
+        ranges = Seq((25, 28))
+        testNestedStringArrayOneLevel(Some(firstRowIndexes), Some(ranges), 
pageSizes,
+          Seq(),
+          Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0),
+          Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1),
+          (0 to 15),
+          batchSize = batchSize, dictionaryEnabled = dictionaryEnabled)
+
+        ranges = Seq((35, 45))
+        testNestedStringArrayOneLevel(Some(firstRowIndexes), Some(ranges), 
pageSizes,
+          Seq(Seq(), null, null, Seq()),
+          Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0),
+          Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1),
+          (0 to 15),
+          batchSize = batchSize, dictionaryEnabled = dictionaryEnabled)
+
+        ranges = Seq((45, 55))
+        testNestedStringArrayOneLevel(Some(firstRowIndexes), Some(ranges), 
pageSizes,
+          Seq(),
+          Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0),
+          Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1),
+          (0 to 15),
+          batchSize = batchSize, dictionaryEnabled = dictionaryEnabled)
+
+        ranges = Seq((45, 55))
+        testNestedStringArrayOneLevel(Some(firstRowIndexes), Some(ranges), 
pageSizes,
+          Seq(),
+          Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0),
+          Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1),
+          (0 to 15),
+          batchSize = batchSize, dictionaryEnabled = dictionaryEnabled)
+
+        ranges = Seq((15, 29), (31, 35))
+        testNestedStringArrayOneLevel(Some(firstRowIndexes), Some(ranges), 
pageSizes,
+          Seq(Seq(), Seq("f", null, "h")),
+          Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0),
+          Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1),
+          (0 to 15),
+          batchSize = batchSize, dictionaryEnabled = dictionaryEnabled)
+      }
+    }
+  }
+
   private def testPrimitiveString(
       firstRowIndexesOpt: Option[Seq[Long]],
       rangesOpt: Option[Seq[(Long, Long)]],
@@ -236,6 +520,52 @@ class ParquetVectorizedSuite extends QueryTest with 
ParquetTest with SharedSpark
         rangesOpt), expectedValues.map(i => Row(i)), batchSize)
   }
 
+  private def testNestedStringArrayOneLevel(
+      firstRowIndexesOpt: Option[Seq[Long]],
+      rangesOpt: Option[Seq[(Long, Long)]],
+      pageSizes: Seq[Int],
+      expected: Seq[Seq[String]],
+      rls: Seq[Int],
+      dls: Seq[Int],
+      values: Seq[String] = VALUES,
+      batchSize: Int,
+      dictionaryEnabled: Boolean = false): Unit = {
+    assert(pageSizes.sum == rls.length && rls.length == dls.length)
+    firstRowIndexesOpt.foreach(a => assert(pageSizes.length == a.length))
+
+    val parquetSchema = MessageTypeParser.parseMessageType(
+      s"""message root {
+         |  optional group _1 (LIST) {
+         |    repeated group list {
+         |      optional binary a(UTF8);
+         |    }
+         |  }
+         |}
+         |""".stripMargin
+    )
+
+    val maxRepLevel = 1
+    val maxDefLevel = 3
+    val ty = parquetSchema.getType("_1", "list", "a").asPrimitiveType()
+    val cd = new ColumnDescriptor(Seq("_1", "list", "a").toArray, ty, 
maxRepLevel, maxDefLevel)
+
+    var i = 0
+    var numRows = 0
+    val memPageStore = new MemPageStore(expected.length)
+    val pageFirstRowIndexes = ArrayBuffer.empty[Long]
+    pageSizes.foreach { size =>
+      pageFirstRowIndexes += numRows
+      numRows += rls.slice(i, i + size).count(_ == 0)
+      writeDataPage(cd, memPageStore, rls.slice(i, i + size), dls.slice(i, i + 
size),
+        values.slice(i, i + size), maxDefLevel, dictionaryEnabled)
+      i += size
+    }
+
+    checkAnswer(expected.length, parquetSchema,
+      TestPageReadStore(memPageStore, 
firstRowIndexesOpt.getOrElse(pageFirstRowIndexes).toSeq,
+        rangesOpt), expected.map(i => Row(i)), batchSize)
+  }
+
   /**
    * Write a single data page using repetition levels, definition levels and 
values provided.
    *

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to