Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8553#discussion_r38617544
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
 ---
    @@ -32,31 +32,56 @@ import org.apache.spark.Logging
     import org.apache.spark.sql.catalyst.InternalRow
     import org.apache.spark.sql.types._
     
    +/**
    + * A Parquet [[ReadSupport]] implementation for reading Parquet records as 
Catalyst
    + * [[InternalRow]]s.
    + *
    + * The API interface of [[ReadSupport]] is a little bit over complicated 
because of historical
    + * reasons.  In older versions of parquet-mr (say 1.6.0rc3 and prior), 
[[ReadSupport]] need to be
    + * instantiated and initialized twice on both driver side and executor 
side.  The [[init()]] method
    + * is for driver side initialization, while [[prepareForRead()]] is for 
executor side.  However,
    + * starting from parquet-mr 1.6.0, it's no longer the case, and 
[[ReadSupport]] is only instantiated
    + * and initialized on executor side.  So, theoretically, now it's totally 
fine to combine these two
    + * methods into a single initialization method.  The only reason (I could 
think of) to still have
    + * them here is for parquet-mr API backwards-compatibility.
    + *
    + * Due to this reason, we no longer rely on [[ReadContext]] to pass 
requested schema from [[init()]]
    + * to [[prepareForRead()]], but use a private `var` for simplicity.
    + */
     private[parquet] class CatalystReadSupport extends 
ReadSupport[InternalRow] with Logging {
    -  // Called after `init()` when initializing Parquet record reader.
    +  private var catalystRequestedSchema: StructType = _
    +
    +  /**
    +   * Called on executor side before [[prepareForRead()]] and instantiating 
actual Parquet record
    +   * readers.  Responsible for figuring out Parquet requested schema used 
for column pruning.
    +   */
    +  override def init(context: InitContext): ReadContext = {
    +    catalystRequestedSchema = {
    +      val conf = context.getConfiguration
    +      val schemaString = 
conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
    +      assert(schemaString != null, "Parquet requested schema not set.")
    +      StructType.fromString(schemaString)
    +    }
    +
    +    val parquetRequestedSchema =
    +      CatalystReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
    +
    +    new ReadContext(parquetRequestedSchema, Map.empty[String, 
String].asJava)
    +  }
    +
    +  /**
    +   * Called on executor side after [[init()]], before instantiating actual 
Parquet record readers.
    +   * Responsible for instantiating [[RecordMaterializer]], which is used 
for converting Parquet
    +   * records to Catalyst [[InternalRow]]s.
    +   */
       override def prepareForRead(
           conf: Configuration,
           keyValueMetaData: JMap[String, String],
           fileSchema: MessageType,
           readContext: ReadContext): RecordMaterializer[InternalRow] = {
         log.debug(s"Preparing for read Parquet file with message type: 
$fileSchema")
    -
    -    val toCatalyst = new CatalystSchemaConverter(conf)
         val parquetRequestedSchema = readContext.getRequestedSchema
     
    -    val catalystRequestedSchema =
    -      Option(readContext.getReadSupportMetadata).map(_.asScala).flatMap { 
metadata =>
    -        metadata
    -          // First tries to read requested schema, which may result from 
projections
    -          .get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
    -          // If not available, tries to read Catalyst schema from file 
metadata.  It's only
    -          // available if the target file is written by Spark SQL.
    -          .orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY))
    -      }.map(StructType.fromString).getOrElse {
    -        logInfo("Catalyst schema not available, falling back to Parquet 
schema")
    -        toCatalyst.convert(parquetRequestedSchema)
    -      }
    --- End diff --
    
    This "fallback" logic is removed because now we always set requested schema 
properly along the read path. This piece of code was inherited from the old 
Parquet support, which has already been removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to