HyukjinKwon commented on code in PR #36327:
URL: https://github.com/apache/spark/pull/36327#discussion_r857226336
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala:
##########
@@ -509,16 +478,73 @@ case class FileSourceScanExec(
} else {
None
}
- } ++ {
- if (relation.partitionSchema.nonEmpty) {
- Map(
- "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of
partitions read"),
- "pruningTime" ->
- SQLMetrics.createTimingMetric(sparkContext, "dynamic partition
pruning time"))
+ } ++ driverMetrics
+}
+
+/**
+ * Physical plan node for scanning data from HadoopFsRelations.
+ *
+ * @param relation The file-based relation to scan.
+ * @param output Output attributes of the scan, including data attributes and
partition attributes.
+ * @param requiredSchema Required schema of the underlying relation, excluding
partition columns.
+ * @param partitionFilters Predicates to use for partition pruning.
+ * @param optionalBucketSet Bucket ids for bucket pruning.
+ * @param optionalNumCoalescedBuckets Number of coalesced buckets.
+ * @param dataFilters Filters on non-partition columns.
+ * @param tableIdentifier Identifier for the table in the metastore.
+ * @param disableBucketedScan Disable bucketed scan based on physical query
plan, see rule
+ * [[DisableUnnecessaryBucketedScan]] for details.
+ */
+case class FileSourceScanExec(
+ @transient override val relation: HadoopFsRelation,
+ override val output: Seq[Attribute],
+ override val requiredSchema: StructType,
+ override val partitionFilters: Seq[Expression],
+ override val optionalBucketSet: Option[BitSet],
+ override val optionalNumCoalescedBuckets: Option[Int],
+ override val dataFilters: Seq[Expression],
+ override val tableIdentifier: Option[TableIdentifier],
+ override val disableBucketedScan: Boolean = false)
+ extends FileSourceScanLike {
+
+ // Note that some vals referring the file-based relation are lazy
intentionally
+ // so that this plan can be canonicalized on executor side too. See
SPARK-23731.
+ override lazy val supportsColumnar: Boolean = {
+ relation.fileFormat.supportBatch(relation.sparkSession, schema)
+ }
+
+ private lazy val needsUnsafeRowConversion: Boolean = {
+ if (relation.fileFormat.isInstanceOf[ParquetSource]) {
+ conf.parquetVectorizedReaderEnabled
} else {
- Map.empty[String, SQLMetric]
+ false
}
- } ++ staticMetrics
+ }
+
+ lazy val inputRDD: RDD[InternalRow] = {
Review Comment:
I like this idea of separating planning/metrics stuff vs RDD/execution.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]