cloud-fan commented on a change in pull request #34001:
URL: https://github.com/apache/spark/pull/34001#discussion_r709343963



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
##########
@@ -427,3 +433,158 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
     case _ => Nil
   }
 }
+
+private[sql] object DataSourceV2Strategy {
+
+  private def translateLeafNodeFilterV2(
+      predicate: Expression,
+      pushableColumn: PushableColumnBase): Option[V2Filter] = predicate match {
+    case expressions.EqualTo(pushableColumn(name), Literal(v, t)) =>
+      Some(new V2EqualTo(FieldReference(name), LiteralValue(v, t)))
+    case expressions.EqualTo(Literal(v, t), pushableColumn(name)) =>
+      Some(new V2EqualTo(FieldReference(name), LiteralValue(v, t)))
+
+    case expressions.EqualNullSafe(pushableColumn(name), Literal(v, t)) =>
+      Some(new V2EqualNullSafe(FieldReference(name), LiteralValue(v, t)))
+    case expressions.EqualNullSafe(Literal(v, t), pushableColumn(name)) =>
+      Some(new V2EqualNullSafe(FieldReference(name), LiteralValue(v, t)))
+
+    case expressions.GreaterThan(pushableColumn(name), Literal(v, t)) =>
+      Some(new V2GreaterThan(FieldReference(name), LiteralValue(v, t)))
+    case expressions.GreaterThan(Literal(v, t), pushableColumn(name)) =>
+      Some(new V2LessThan(FieldReference(name), LiteralValue(v, t)))
+
+    case expressions.LessThan(pushableColumn(name), Literal(v, t)) =>
+      Some(new V2LessThan(FieldReference(name), LiteralValue(v, t)))
+    case expressions.LessThan(Literal(v, t), pushableColumn(name)) =>
+      Some(new V2GreaterThan(FieldReference(name), LiteralValue(v, t)))
+
+    case expressions.GreaterThanOrEqual(pushableColumn(name), Literal(v, t)) =>
+      Some(new V2GreaterThanOrEqual(FieldReference(name), LiteralValue(v, t)))
+    case expressions.GreaterThanOrEqual(Literal(v, t), pushableColumn(name)) =>
+      Some(new V2LessThanOrEqual(FieldReference(name), LiteralValue(v, t)))
+
+    case expressions.LessThanOrEqual(pushableColumn(name), Literal(v, t)) =>
+      Some(new V2LessThanOrEqual(FieldReference(name), LiteralValue(v, t)))
+    case expressions.LessThanOrEqual(Literal(v, t), pushableColumn(name)) =>
+      Some(new V2GreaterThanOrEqual(FieldReference(name), LiteralValue(v, t)))
+
+    case in @ expressions.InSet(pushableColumn(name), set) =>
+      val values: Array[V2Literal[_]] =
+        set.toSeq.map(elem => LiteralValue(elem, in.dataType)).toArray
+      Some(new V2In(FieldReference(name), values))
+
+    // Because we only convert In to InSet in Optimizer when there are more 
than certain
+    // items. So it is possible we still get an In expression here that needs 
to be pushed
+    // down.
+    case in @ expressions.In(pushableColumn(name), list) if 
list.forall(_.isInstanceOf[Literal]) =>
+      val hSet = list.map(_.eval(EmptyRow))
+      Some(new V2In(FieldReference(name),
+        hSet.toArray.map(LiteralValue(_, in.value.dataType))))
+
+    case expressions.IsNull(pushableColumn(name)) =>
+      Some(new V2IsNull(FieldReference(name)))
+    case expressions.IsNotNull(pushableColumn(name)) =>
+      Some(new V2IsNotNull(FieldReference(name)))
+
+    case expressions.StartsWith(pushableColumn(name), Literal(v: UTF8String, 
StringType)) =>
+      Some(new V2StringStartsWith(FieldReference(name), v))
+
+    case expressions.EndsWith(pushableColumn(name), Literal(v: UTF8String, 
StringType)) =>
+      Some(new V2StringEndsWith(FieldReference(name), v))
+
+    case expressions.Contains(pushableColumn(name), Literal(v: UTF8String, 
StringType)) =>
+      Some(new V2StringContains(FieldReference(name), v))
+
+    case expressions.Literal(true, BooleanType) =>
+      Some(new V2AlwaysTrue)
+
+    case expressions.Literal(false, BooleanType) =>
+      Some(new V2AlwaysFalse)
+
+    case _ => None
+  }
+
+    /**
+     * Tries to translate a Catalyst [[Expression]] into data source 
[[Filter]].
+     *
+     * @return a `Some[Filter]` if the input [[Expression]] is convertible, 
otherwise a `None`.
+     */
+    protected[sql] def translateFilterV2(
+        predicate: Expression,
+        supportNestedPredicatePushdown: Boolean): Option[V2Filter] = {
+      translateFilterV2WithMapping(predicate, None, 
supportNestedPredicatePushdown)
+    }
+
+  /**
+   * Tries to translate a Catalyst [[Expression]] into data source [[Filter]].
+   *
+   * @param predicate The input [[Expression]] to be translated as [[Filter]]
+   * @param translatedFilterToExpr An optional map from leaf node filter 
expressions to its
+   *                               translated [[Filter]]. The map is used for 
rebuilding
+   *                               [[Expression]] from [[Filter]].
+   * @return a `Some[Filter]` if the input [[Expression]] is convertible, 
otherwise a `None`.
+   */
+  protected[sql] def translateFilterV2WithMapping(
+      predicate: Expression,
+      translatedFilterToExpr: Option[mutable.HashMap[V2Filter, Expression]],
+      nestedPredicatePushdownEnabled: Boolean)
+  : Option[V2Filter] = {
+    predicate match {
+      case expressions.And(left, right) =>
+        // See SPARK-12218 for detailed discussion
+        // It is not safe to just convert one side if we do not understand the
+        // other side. Here is an example used to explain the reason.
+        // Let's say we have (a = 2 AND trim(b) = 'blah') OR (c > 0)
+        // and we do not understand how to convert trim(b) = 'blah'.
+        // If we only convert a = 2, we will end up with
+        // (a = 2) OR (c > 0), which will generate wrong results.
+        // Pushing one leg of AND down is only safe to do at the top level.
+        // You can see ParquetFilters' createFilter for more details.
+        for {
+          leftFilter <- translateFilterV2WithMapping(
+            left, translatedFilterToExpr, nestedPredicatePushdownEnabled)
+          rightFilter <- translateFilterV2WithMapping(
+            right, translatedFilterToExpr, nestedPredicatePushdownEnabled)
+        } yield new V2And(leftFilter, rightFilter)
+
+      case expressions.Or(left, right) =>
+        for {
+          leftFilter <- translateFilterV2WithMapping(
+            left, translatedFilterToExpr, nestedPredicatePushdownEnabled)
+          rightFilter <- translateFilterV2WithMapping(
+            right, translatedFilterToExpr, nestedPredicatePushdownEnabled)
+        } yield new V2Or(leftFilter, rightFilter)
+
+      case expressions.Not(child) =>
+        translateFilterV2WithMapping(child, translatedFilterToExpr, 
nestedPredicatePushdownEnabled)
+          .map(new V2Not(_))
+
+      case other =>
+        val filter = translateLeafNodeFilterV2(
+          other, PushableColumn(nestedPredicatePushdownEnabled))
+        if (filter.isDefined && translatedFilterToExpr.isDefined) {
+          translatedFilterToExpr.get(filter.get) = predicate
+        }
+        filter
+    }
+  }
+
+  protected[sql] def rebuildExpressionFromFilter(
+      filter: V2Filter,
+      translatedFilterToExpr: mutable.HashMap[V2Filter, Expression]): 
Expression = {
+    filter match {
+      case and: V2And =>
+        expressions.And(rebuildExpressionFromFilter(and.left, 
translatedFilterToExpr),
+          rebuildExpressionFromFilter(and.right, translatedFilterToExpr))
+      case or: V2Or =>
+        expressions.Or(rebuildExpressionFromFilter(or.left, 
translatedFilterToExpr),
+          rebuildExpressionFromFilter(or.right, translatedFilterToExpr))
+      case not: V2Not =>
+        expressions.Not(rebuildExpressionFromFilter(not.child, 
translatedFilterToExpr))
+      case other =>
+        translatedFilterToExpr.getOrElse(other,
+          throw QueryCompilationErrors.failedToRebuildExpressionError(filter))

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to