Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/22880#discussion_r229208839
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
---
@@ -49,34 +49,82 @@ import org.apache.spark.sql.types._
* Due to this reason, we no longer rely on [[ReadContext]] to pass
requested schema from [[init()]]
* to [[prepareForRead()]], but use a private `var` for simplicity.
*/
-private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone])
+private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone],
+ usingVectorizedReader: Boolean)
extends ReadSupport[UnsafeRow] with Logging {
private var catalystRequestedSchema: StructType = _
def this() {
// We need a zero-arg constructor for SpecificParquetRecordReaderBase.
But that is only
// used in the vectorized reader, where we get the convertTz value
directly, and the value here
// is ignored.
- this(None)
+ this(None, usingVectorizedReader = true)
}
/**
* Called on executor side before [[prepareForRead()]] and instantiating
actual Parquet record
* readers. Responsible for figuring out Parquet requested schema used
for column pruning.
*/
override def init(context: InitContext): ReadContext = {
+ val conf = context.getConfiguration
catalystRequestedSchema = {
- val conf = context.getConfiguration
val schemaString =
conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
assert(schemaString != null, "Parquet requested schema not set.")
StructType.fromString(schemaString)
}
- val caseSensitive =
context.getConfiguration.getBoolean(SQLConf.CASE_SENSITIVE.key,
+ val schemaPruningEnabled =
conf.getBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
+ SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get)
+ val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key,
SQLConf.CASE_SENSITIVE.defaultValue.get)
- val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema(
- context.getFileSchema, catalystRequestedSchema, caseSensitive)
-
+ val parquetFileSchema = context.getFileSchema
+ val parquetClippedSchema =
ParquetReadSupport.clipParquetSchema(parquetFileSchema,
+ catalystRequestedSchema, caseSensitive)
+
+ // As part of schema clipping, we add fields in
catalystRequestedSchema which are missing
+ // from parquetFileSchema to parquetClippedSchema. However, nested
schema pruning requires
+ // we ignore unrequested field data when reading from a Parquet file.
Therefore we pass two
+ // schema to ParquetRecordMaterializer: the schema of the file data we
want to read
+ // (parquetRequestedSchema), and the schema of the rows we want to
return
+ // (catalystRequestedSchema). The reader is responsible for
reconciling the differences between
+ // the two.
+ //
+ // Aside from checking whether schema pruning is enabled
(schemaPruningEnabled), there
+ // is an additional complication to constructing
parquetRequestedSchema. The manner in which
+ // Spark's two Parquet readers reconcile the differences between
parquetRequestedSchema and
+ // catalystRequestedSchema differ. Spark's vectorized reader does not
(currently) support
+ // reading Parquet files with complex types in their schema. Further,
it assumes that
+ // parquetRequestedSchema includes all fields requested in
catalystRequestedSchema. It includes
+ // logic in its read path to skip fields in parquetRequestedSchema
which are not present in the
+ // file.
+ //
+ // Spark's parquet-mr based reader supports reading Parquet files of
any kind of complex
+ // schema, and it supports nested schema pruning as well. Unlike the
vectorized reader, the
+ // parquet-mr reader requires that parquetRequestedSchema include only
those fields present in
+ // the underlying parquetFileSchema. Therefore, in the case where we
use the parquet-mr reader
+ // we intersect the parquetClippedSchema with the parquetFileSchema to
construct the
+ // parquetRequestedSchema set in the ReadContext.
--- End diff --
For vectorized reader, even we do this additional `intersectParquetGroups`,
will it cause any problem?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]