Github user rdblue commented on a diff in the pull request:
https://github.com/apache/spark/pull/21503#discussion_r194861645
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
---
@@ -17,15 +17,56 @@
package org.apache.spark.sql.execution.datasources.v2
-import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.{execution, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference,
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
WriteToContinuousDataSourceExec}
object DataSourceV2Strategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case r: DataSourceV2Relation =>
- DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters,
r.reader) :: Nil
+ case PhysicalOperation(project, filters, relation:
DataSourceV2Relation) =>
+ val projectSet = AttributeSet(project.flatMap(_.references))
+ val filterSet = AttributeSet(filters.flatMap(_.references))
+
+ val projection = if (filterSet.subsetOf(projectSet) &&
+ AttributeSet(relation.output) == projectSet) {
+ // When the required projection contains all of the filter columns
and column pruning alone
+ // can produce the required projection, push the required
projection.
+ // A final projection may still be needed if the data source
produces a different column
+ // order or if it cannot prune all of the nested columns.
+ relation.output
+ } else {
+ // When there are filter columns not already in the required
projection or when the required
+ // projection is more complicated than column pruning, base column
pruning on the set of
+ // all columns needed by both.
+ (projectSet ++ filterSet).toSeq
+ }
+
+ val reader = relation.newReader
--- End diff --
I didn't realize you were talking about other v2 sources. Yes, two readers
would be configured for v2. If you wanted to avoid it, you could cache when
pushdown is expensive in the implementation or we could add something else that
prevents that case.
We need to do *something* to fix the current behavior of doing pushdown in
the optimizer. I'm perfectly happy with less accurate stats for v2 until stats
use the physical plan, or a solution like this where pushdown happens twice. I
just don't think it is a good idea to continue with the design where the
logical plan needs to use the v2 reader APIs. I think we agree that that should
happen once, and conversion to physical plan is where it makes the most sense.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]