Github user liancheng commented on a diff in the pull request:
https://github.com/apache/spark/pull/8553#discussion_r38617544
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
---
@@ -32,31 +32,56 @@ import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._
+/**
+ * A Parquet [[ReadSupport]] implementation for reading Parquet records as
Catalyst
+ * [[InternalRow]]s.
+ *
+ * The API interface of [[ReadSupport]] is a little bit over complicated
because of historical
+ * reasons. In older versions of parquet-mr (say 1.6.0rc3 and prior),
[[ReadSupport]] need to be
+ * instantiated and initialized twice on both driver side and executor
side. The [[init()]] method
+ * is for driver side initialization, while [[prepareForRead()]] is for
executor side. However,
+ * starting from parquet-mr 1.6.0, it's no longer the case, and
[[ReadSupport]] is only instantiated
+ * and initialized on executor side. So, theoretically, now it's totally
fine to combine these two
+ * methods into a single initialization method. The only reason (I could
think of) to still have
+ * them here is for parquet-mr API backwards-compatibility.
+ *
+ * Due to this reason, we no longer rely on [[ReadContext]] to pass
requested schema from [[init()]]
+ * to [[prepareForRead()]], but use a private `var` for simplicity.
+ */
private[parquet] class CatalystReadSupport extends
ReadSupport[InternalRow] with Logging {
- // Called after `init()` when initializing Parquet record reader.
+ private var catalystRequestedSchema: StructType = _
+
+ /**
+ * Called on executor side before [[prepareForRead()]] and instantiating
actual Parquet record
+ * readers. Responsible for figuring out Parquet requested schema used
for column pruning.
+ */
+ override def init(context: InitContext): ReadContext = {
+ catalystRequestedSchema = {
+ val conf = context.getConfiguration
+ val schemaString =
conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
+ assert(schemaString != null, "Parquet requested schema not set.")
+ StructType.fromString(schemaString)
+ }
+
+ val parquetRequestedSchema =
+ CatalystReadSupport.clipParquetSchema(context.getFileSchema,
catalystRequestedSchema)
+
+ new ReadContext(parquetRequestedSchema, Map.empty[String,
String].asJava)
+ }
+
+ /**
+ * Called on executor side after [[init()]], before instantiating actual
Parquet record readers.
+ * Responsible for instantiating [[RecordMaterializer]], which is used
for converting Parquet
+ * records to Catalyst [[InternalRow]]s.
+ */
override def prepareForRead(
conf: Configuration,
keyValueMetaData: JMap[String, String],
fileSchema: MessageType,
readContext: ReadContext): RecordMaterializer[InternalRow] = {
log.debug(s"Preparing for read Parquet file with message type:
$fileSchema")
-
- val toCatalyst = new CatalystSchemaConverter(conf)
val parquetRequestedSchema = readContext.getRequestedSchema
- val catalystRequestedSchema =
- Option(readContext.getReadSupportMetadata).map(_.asScala).flatMap {
metadata =>
- metadata
- // First tries to read requested schema, which may result from
projections
- .get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
- // If not available, tries to read Catalyst schema from file
metadata. It's only
- // available if the target file is written by Spark SQL.
- .orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY))
- }.map(StructType.fromString).getOrElse {
- logInfo("Catalyst schema not available, falling back to Parquet
schema")
- toCatalyst.convert(parquetRequestedSchema)
- }
--- End diff --
This "fallback" logic is removed because now we always set requested schema
properly along the read path. This piece of code was inherited from the old
Parquet support, which has already been removed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]