Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/21503#discussion_r194895594
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
---
@@ -17,15 +17,56 @@
package org.apache.spark.sql.execution.datasources.v2
-import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.{execution, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference,
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
WriteToContinuousDataSourceExec}
object DataSourceV2Strategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case r: DataSourceV2Relation =>
- DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters,
r.reader) :: Nil
+ case PhysicalOperation(project, filters, relation:
DataSourceV2Relation) =>
+ val projectSet = AttributeSet(project.flatMap(_.references))
+ val filterSet = AttributeSet(filters.flatMap(_.references))
+
+ val projection = if (filterSet.subsetOf(projectSet) &&
+ AttributeSet(relation.output) == projectSet) {
+ // When the required projection contains all of the filter columns
and column pruning alone
+ // can produce the required projection, push the required
projection.
+ // A final projection may still be needed if the data source
produces a different column
+ // order or if it cannot prune all of the nested columns.
+ relation.output
+ } else {
+ // When there are filter columns not already in the required
projection or when the required
+ // projection is more complicated than column pruning, base column
pruning on the set of
+ // all columns needed by both.
+ (projectSet ++ filterSet).toSeq
+ }
+
+ val reader = relation.newReader
--- End diff --
Yea the second proposal is what happens for the v1 data sources. For
file-based data source we kind of pick the third proposal and add an optimizer
rule `PruneFileSourcePartitions` to push down some of the filters to data
source at the logical phase, to get precise stats.
I'd like to pick from the 2nd and 3rd proposals(the 3rd proposal is also
temporary, before we move stats to physical plan). Applying pushdown twice is
hard to workaround(need to cache), while we can keep the
`PruneFileSourcePartitions` rule to work around the issue in 2nd proposal for
file-based data sources.
Let's also get more inputs from other people.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]