Github user gengliangwang commented on a diff in the pull request:
https://github.com/apache/spark/pull/21319#discussion_r188255611
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
---
@@ -17,48 +17,81 @@
package org.apache.spark.sql.execution.datasources.v2
-import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference,
AttributeSet}
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet,
Expression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan,
Project}
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources
+import
org.apache.spark.sql.sources.v2.reader.{SupportsPushDownCatalystFilters,
SupportsPushDownFilters, SupportsPushDownRequiredColumns}
object PushDownOperatorsToDataSource extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan match {
// PhysicalOperation guarantees that filters are deterministic; no
need to check
case PhysicalOperation(project, filters, relation:
DataSourceV2Relation) =>
- assert(relation.filters.isEmpty, "data source v2 should do push down
only once.")
+ val newReader = relation.createFreshReader
+ var newRelation = relation.copy(optimizedReader = Some(newReader))
- val projectAttrs = project.map(_.toAttribute)
- val projectSet = AttributeSet(project.flatMap(_.references))
- val filterSet = AttributeSet(filters.flatMap(_.references))
+ val postScanFilters: Seq[Expression] = newReader match {
+ case r: SupportsPushDownCatalystFilters =>
+ val postScanFilters = r.pushCatalystFilters(filters.toArray)
+ newRelation.copy(pushedFilters = r.pushedCatalystFilters())
+ postScanFilters
- val projection = if (filterSet.subsetOf(projectSet) &&
- AttributeSet(projectAttrs) == projectSet) {
- // When the required projection contains all of the filter columns
and column pruning alone
- // can produce the required projection, push the required
projection.
- // A final projection may still be needed if the data source
produces a different column
- // order or if it cannot prune all of the nested columns.
- projectAttrs
- } else {
- // When there are filter columns not already in the required
projection or when the required
- // projection is more complicated than column pruning, base column
pruning on the set of
- // all columns needed by both.
- (projectSet ++ filterSet).toSeq
+ case r: SupportsPushDownFilters =>
+ // A map from translated data source filters to original
catalyst filter expressions.
+ val translatedFilterToExpr =
mutable.HashMap.empty[sources.Filter, Expression]
+ // Catalyst filter expression that can't be translated to data
source filters.
+ val untranslatableExprs = mutable.ArrayBuffer.empty[Expression]
+
+ for (filterExpr <- filters) {
+ val translated = DataSourceStrategy.translateFilter(filterExpr)
+ if (translated.isDefined) {
+ translatedFilterToExpr(translated.get) = filterExpr
+ } else {
+ untranslatableExprs += filterExpr
+ }
+ }
+
+ // Data source filters that need to be evaluated again after
scanning. which means
+ // the data source cannot guarantee the rows returned can pass
these filters.
+ // As a result we must return it so Spark can plan an extra
filter operator.
+ val postScanFilters =
+
r.pushFilters(translatedFilterToExpr.keys.toArray).map(translatedFilterToExpr)
+ // The filters which are marked as pushed to this data source
+ val pushedFilters = r.pushedFilters().map(translatedFilterToExpr)
+ newRelation = newRelation.copy(pushedFilters = pushedFilters)
+ untranslatableExprs ++ postScanFilters
+
+ case _ => filters
}
- val newRelation = relation.copy(
- projection = projection.asInstanceOf[Seq[AttributeReference]],
- filters = Some(filters))
+ newReader match {
--- End diff --
move to add new function too
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]