cloud-fan commented on a change in pull request #35768:
URL: https://github.com/apache/spark/pull/35768#discussion_r831796162



##########
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
##########
@@ -103,12 +125,46 @@ protected String visitIsNotNull(String v) {
     return v + " IS NOT NULL";
   }
 
+  protected String visitStartsWith(String l, String r) {
+    // Remove quotes at the beginning and end.
+    // e.g. converts "'str'" to "str".
+    String value = r.substring(1, r.length() - 1);
+    return l + " LIKE '" + value + "%'";
+  }
+
+  protected String visitEndsWith(String l, String r) {
+    // Remove quotes at the beginning and end.
+    // e.g. converts "'str'" to "str".
+    String value = r.substring(1, r.length() - 1);
+    return l + " LIKE '%" + value + "'";
+  }
+
+  protected String visitContains(String l, String r) {
+    // Remove quotes at the beginning and end.
+    // e.g. converts "'str'" to "str".
+    String value = r.substring(1, r.length() - 1);
+    return l + " LIKE '%" + value + "%'";
+  }
+
+  private String inputToSQL(Expression input) {

Review comment:
       this should be called in more places, such as AND, OR, NOT, etc.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
##########
@@ -39,51 +44,119 @@ class V2ExpressionBuilder(e: Expression) {
     case _ => false
   }
 
-  private def generateExpression(expr: Expression): Option[V2Expression] = 
expr match {
+  private def generateExpression(
+      expr: Expression, isPredicate: Boolean = false): Option[V2Expression] = 
expr match {
+    case Literal(true, BooleanType) => Some(new AlwaysTrue())
+    case Literal(false, BooleanType) => Some(new AlwaysFalse())
     case Literal(value, dataType) => Some(LiteralValue(value, dataType))
-    case attr: Attribute => Some(FieldReference.column(attr.name))
+    case col @ pushableColumn(name) if nestedPredicatePushdownEnabled =>
+      if (isPredicate && col.dataType.isInstanceOf[BooleanType]) {
+        Some(new V2Predicate("=", Array(FieldReference(name), 
LiteralValue(true, BooleanType))))
+      } else {
+        Some(FieldReference(name))
+      }
+    case pushableColumn(name) if !nestedPredicatePushdownEnabled =>
+      Some(FieldReference.column(name))
+    case in @ InSet(child, hset) =>
+      generateExpression(child).map { v =>
+        val children =
+          (v +: hset.toSeq.map(elem => LiteralValue(elem, 
in.dataType))).toArray[V2Expression]
+        new V2Predicate("IN", children)
+      }
+    // Because we only convert In to InSet in Optimizer when there are more 
than certain
+    // items. So it is possible we still get an In expression here that needs 
to be pushed
+    // down.
+    case In(value, list) =>
+      val v = generateExpression(value)
+      val listExpressions = list.flatMap(generateExpression(_))
+      if (v.isDefined && list.length == listExpressions.length) {
+        val children = (v.get +: listExpressions).toArray[V2Expression]
+        // The children looks like [expr, value1, ..., valueN]
+        Some(new V2Predicate("IN", children))
+      } else {
+        None
+      }
     case IsNull(col) => generateExpression(col)
-      .map(c => new GeneralScalarExpression("IS_NULL", Array[V2Expression](c)))
+      .map(c => new V2Predicate("IS_NULL", Array[V2Expression](c)))
     case IsNotNull(col) => generateExpression(col)
-      .map(c => new GeneralScalarExpression("IS_NOT_NULL", 
Array[V2Expression](c)))
-    case b: BinaryOperator if canTranslate(b) =>
-      val left = generateExpression(b.left)
-      val right = generateExpression(b.right)
+      .map(c => new V2Predicate("IS_NOT_NULL", Array[V2Expression](c)))
+    case p: StringPredicate =>
+      val left = generateExpression(p.left)
+      val right = generateExpression(p.right)
       if (left.isDefined && right.isDefined) {
-        Some(new GeneralScalarExpression(b.sqlOperator, 
Array[V2Expression](left.get, right.get)))
+        val name = p match {
+          case _: StartsWith => "STARTS_WITH"
+          case _: EndsWith => "ENDS_WITH"
+          case _: Contains => "CONTAINS"
+        }
+        Some(new V2Predicate(name, Array[V2Expression](left.get, right.get)))
+      } else {
+        None
+      }
+    case and: And =>
+      val l = generateExpression(and.left, true)

Review comment:
       nit: let's add comments `// AND expects predicate`

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
##########
@@ -220,12 +222,18 @@ abstract class JdbcDialect extends Serializable with 
Logging{
   }
 
   class JDBCSQLBuilder extends V2ExpressionSQLBuilder {
-    override def visitFieldReference(fieldRef: FieldReference): String = {
-      if (fieldRef.fieldNames().length != 1) {
+    override def visitLiteral(literal: Literal[_]): String = {
+      compileValue(
+        CatalystTypeConverters.convertToScala(literal.value(), 
literal.dataType())).toString
+    }
+
+    override def visitNamedReference(namedRef: NamedReference): String = {
+      if (namedRef.fieldNames().length > 1) {
         throw new IllegalArgumentException(

Review comment:
       since we catch `NonFatal` now, we don't need to keep 
`IllegalArgumentException` anymore.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
##########
@@ -39,51 +44,119 @@ class V2ExpressionBuilder(e: Expression) {
     case _ => false
   }
 
-  private def generateExpression(expr: Expression): Option[V2Expression] = 
expr match {
+  private def generateExpression(
+      expr: Expression, isPredicate: Boolean = false): Option[V2Expression] = 
expr match {
+    case Literal(true, BooleanType) => Some(new AlwaysTrue())
+    case Literal(false, BooleanType) => Some(new AlwaysFalse())
     case Literal(value, dataType) => Some(LiteralValue(value, dataType))
-    case attr: Attribute => Some(FieldReference.column(attr.name))
+    case col @ pushableColumn(name) if nestedPredicatePushdownEnabled =>
+      if (isPredicate && col.dataType.isInstanceOf[BooleanType]) {
+        Some(new V2Predicate("=", Array(FieldReference(name), 
LiteralValue(true, BooleanType))))
+      } else {
+        Some(FieldReference(name))
+      }
+    case pushableColumn(name) if !nestedPredicatePushdownEnabled =>
+      Some(FieldReference.column(name))
+    case in @ InSet(child, hset) =>
+      generateExpression(child).map { v =>
+        val children =
+          (v +: hset.toSeq.map(elem => LiteralValue(elem, 
in.dataType))).toArray[V2Expression]
+        new V2Predicate("IN", children)
+      }
+    // Because we only convert In to InSet in Optimizer when there are more 
than certain
+    // items. So it is possible we still get an In expression here that needs 
to be pushed
+    // down.
+    case In(value, list) =>
+      val v = generateExpression(value)
+      val listExpressions = list.flatMap(generateExpression(_))
+      if (v.isDefined && list.length == listExpressions.length) {
+        val children = (v.get +: listExpressions).toArray[V2Expression]
+        // The children looks like [expr, value1, ..., valueN]
+        Some(new V2Predicate("IN", children))
+      } else {
+        None
+      }
     case IsNull(col) => generateExpression(col)
-      .map(c => new GeneralScalarExpression("IS_NULL", Array[V2Expression](c)))
+      .map(c => new V2Predicate("IS_NULL", Array[V2Expression](c)))
     case IsNotNull(col) => generateExpression(col)
-      .map(c => new GeneralScalarExpression("IS_NOT_NULL", 
Array[V2Expression](c)))
-    case b: BinaryOperator if canTranslate(b) =>
-      val left = generateExpression(b.left)
-      val right = generateExpression(b.right)
+      .map(c => new V2Predicate("IS_NOT_NULL", Array[V2Expression](c)))
+    case p: StringPredicate =>
+      val left = generateExpression(p.left)
+      val right = generateExpression(p.right)
       if (left.isDefined && right.isDefined) {
-        Some(new GeneralScalarExpression(b.sqlOperator, 
Array[V2Expression](left.get, right.get)))
+        val name = p match {
+          case _: StartsWith => "STARTS_WITH"
+          case _: EndsWith => "ENDS_WITH"
+          case _: Contains => "CONTAINS"
+        }
+        Some(new V2Predicate(name, Array[V2Expression](left.get, right.get)))
+      } else {
+        None
+      }
+    case and: And =>
+      val l = generateExpression(and.left, true)
+      val r = generateExpression(and.right, true)
+      if (l.isDefined && r.isDefined) {
+        assert(l.get.isInstanceOf[V2Predicate] && 
r.get.isInstanceOf[V2Predicate])
+        Some(new V2And(l.get.asInstanceOf[V2Predicate], 
r.get.asInstanceOf[V2Predicate]))
+      } else {
+        None
+      }
+    case or: Or =>
+      val l = generateExpression(or.left, true)

Review comment:
       ditto

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
##########
@@ -264,11 +265,165 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
     df.queryExecution.optimizedPlan.collect {
       case _: DataSourceV2ScanRelation =>
         val expected_plan_fragment =
-          "PushedFilters: [IsNotNull(ID), GreaterThan(ID,1)]"
+          "PushedFilters: [ID IS NOT NULL, ID > 1]"
         checkKeywordsExistsInExplain(df, expected_plan_fragment)
     }
 
     checkAnswer(df, Row("mary", 2))
+
+    val df2 = spark.table("h2.test.employee").filter($"name".isin("amy", 
"cathy"))
+
+    checkFiltersRemoved(df2)
+
+    df2.queryExecution.optimizedPlan.collect {
+      case _: DataSourceV2ScanRelation =>
+        val expected_plan_fragment =
+          "PushedFilters: [NAME IN ('amy', 'cathy')]"
+        checkKeywordsExistsInExplain(df2, expected_plan_fragment)
+    }
+
+    checkAnswer(df2, Seq(Row(1, "amy", 10000, 1000, true), Row(1, "cathy", 
9000, 1200, false)))
+
+    val df3 = spark.table("h2.test.employee").filter($"name".startsWith("a"))
+
+    checkFiltersRemoved(df3)
+
+    df3.queryExecution.optimizedPlan.collect {
+      case _: DataSourceV2ScanRelation =>
+        val expected_plan_fragment =
+          "PushedFilters: [NAME IS NOT NULL, NAME LIKE 'a%']"
+        checkKeywordsExistsInExplain(df3, expected_plan_fragment)
+    }
+
+    checkAnswer(df3, Seq(Row(1, "amy", 10000, 1000, true), Row(2, "alex", 
12000, 1200, false)))
+
+    val df4 = spark.table("h2.test.employee").filter($"is_manager")
+
+    checkFiltersRemoved(df4)
+
+    df4.queryExecution.optimizedPlan.collect {
+      case _: DataSourceV2ScanRelation =>
+        val expected_plan_fragment =
+          "PushedFilters: [IS_MANAGER IS NOT NULL, IS_MANAGER = true]"
+        checkKeywordsExistsInExplain(df4, expected_plan_fragment)
+    }
+
+    checkAnswer(df4, Seq(Row(1, "amy", 10000, 1000, true), Row(2, "david", 
10000, 1300, true),
+      Row(6, "jen", 12000, 1200, true)))
+
+    val df5 = 
spark.table("h2.test.employee").filter($"is_manager".and($"salary" > 10000))
+
+    checkFiltersRemoved(df5)
+
+    df5.queryExecution.optimizedPlan.collect {
+      case _: DataSourceV2ScanRelation =>
+        val expected_plan_fragment =
+          "PushedFilters: [IS_MANAGER IS NOT NULL, SALARY IS NOT NULL, " +
+            "IS_MANAGER = true, SALARY > 10000.00]"
+        checkKeywordsExistsInExplain(df5, expected_plan_fragment)
+    }
+
+    checkAnswer(df5, Seq(Row(6, "jen", 12000, 1200, true)))
+
+    val df6 = 
spark.table("h2.test.employee").filter($"is_manager".or($"salary" > 10000))
+
+    checkFiltersRemoved(df6)
+
+    df6.queryExecution.optimizedPlan.collect {
+      case _: DataSourceV2ScanRelation =>
+        val expected_plan_fragment =
+          "PushedFilters: [(IS_MANAGER = true) OR (SALARY > 10000.00)], "
+        checkKeywordsExistsInExplain(df6, expected_plan_fragment)
+    }
+
+    checkAnswer(df6, Seq(Row(1, "amy", 10000, 1000, true), Row(2, "alex", 
12000, 1200, false),
+      Row(2, "david", 10000, 1300, true), Row(6, "jen", 12000, 1200, true)))
+
+    val df7 = spark.table("h2.test.employee").filter(not($"is_manager") === 
true)
+
+    checkFiltersRemoved(df7)
+
+    df7.queryExecution.optimizedPlan.collect {
+      case _: DataSourceV2ScanRelation =>
+        val expected_plan_fragment =
+          "PushedFilters: [IS_MANAGER IS NOT NULL, NOT (IS_MANAGER = true)], "
+        checkKeywordsExistsInExplain(df7, expected_plan_fragment)
+    }
+
+    checkAnswer(df7, Seq(Row(1, "cathy", 9000, 1200, false), Row(2, "alex", 
12000, 1200, false)))
+
+    val df8 = spark.table("h2.test.employee").filter($"is_manager" === true)
+
+    checkFiltersRemoved(df8)
+
+    df8.queryExecution.optimizedPlan.collect {
+      case _: DataSourceV2ScanRelation =>
+        val expected_plan_fragment =
+          "PushedFilters: [IS_MANAGER IS NOT NULL, IS_MANAGER = true], "
+        checkKeywordsExistsInExplain(df8, expected_plan_fragment)
+    }
+
+    checkAnswer(df8, Seq(Row(1, "amy", 10000, 1000, true),
+      Row(2, "david", 10000, 1300, true), Row(6, "jen", 12000, 1200, true)))
+
+    val df9 = spark.table("h2.test.employee")
+      .filter(when($"dept" > 1, true).when($"is_manager", 
false).otherwise($"dept" > 3))
+
+    checkFiltersRemoved(df9)
+
+    df9.queryExecution.optimizedPlan.collect {
+      case _: DataSourceV2ScanRelation =>
+        val expected_plan_fragment =
+          "PushedFilters: [CASE WHEN DEPT > 1 THEN TRUE WHEN IS_MANAGER = true 
THEN FALSE" +
+            " ELSE DEPT > 3 END], "
+        checkKeywordsExistsInExplain(df9, expected_plan_fragment)
+    }
+
+    checkAnswer(df9, Seq(Row(2, "alex", 12000, 1200, false),
+      Row(2, "david", 10000, 1300, true), Row(6, "jen", 12000, 1200, true)))
+  }
+
+  test("scan with complex filter push-down") {
+    Seq(false, true).foreach { ansiMode =>
+      withSQLConf(SQLConf.ANSI_ENABLED.key -> ansiMode.toString) {
+        val df = spark.table("h2.test.people").filter($"id" + 1 > 1)
+
+        checkFiltersRemoved(df, ansiMode)
+
+        df.queryExecution.optimizedPlan.collect {
+          case _: DataSourceV2ScanRelation =>
+            val expected_plan_fragment = if (ansiMode) {
+              "PushedFilters: [ID IS NOT NULL, (ID + 1) > 1]"
+            } else {
+              "PushedFilters: [ID IS NOT NULL]"
+            }
+            checkKeywordsExistsInExplain(df, expected_plan_fragment)
+        }
+
+        checkAnswer(df, Seq(Row("fred", 1), Row("mary", 2)))
+
+        val df2 = sql("""
+                        |SELECT * FROM h2.test.employee
+                        |WHERE (CASE WHEN SALARY > 10000 THEN BONUS ELSE BONUS 
+ 200 END) > 1200

Review comment:
       We don't need to test CASE WHEN here, as the key is the ADD operator. 
I'd suggest the second test case to test the overflow case, e.g. `ID + Int.Max 
> 1`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to