Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/21503#discussion_r194858392
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
---
@@ -17,15 +17,56 @@
package org.apache.spark.sql.execution.datasources.v2
-import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.{execution, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference,
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
WriteToContinuousDataSourceExec}
object DataSourceV2Strategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case r: DataSourceV2Relation =>
- DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters,
r.reader) :: Nil
+ case PhysicalOperation(project, filters, relation:
DataSourceV2Relation) =>
+ val projectSet = AttributeSet(project.flatMap(_.references))
+ val filterSet = AttributeSet(filters.flatMap(_.references))
+
+ val projection = if (filterSet.subsetOf(projectSet) &&
+ AttributeSet(relation.output) == projectSet) {
+ // When the required projection contains all of the filter columns
and column pruning alone
+ // can produce the required projection, push the required
projection.
+ // A final projection may still be needed if the data source
produces a different column
+ // order or if it cannot prune all of the nested columns.
+ relation.output
+ } else {
+ // When there are filter columns not already in the required
projection or when the required
+ // projection is more complicated than column pruning, base column
pruning on the set of
+ // all columns needed by both.
+ (projectSet ++ filterSet).toSeq
+ }
+
+ val reader = relation.newReader
--- End diff --
> there's nothing forcing other data sources to implement the new trait ...
hmmm, I'm a little confused here. All v2 data sources would have to apply
pushdown twice right? Or are you suggesting we should not migrate file-based
data source to data source v2?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]