gengliangwang commented on a change in pull request #28733:
URL: https://github.com/apache/spark/pull/28733#discussion_r435763240



##########
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
##########
@@ -1230,4 +1237,134 @@ class FilterPushdownSuite extends PlanTest {
 
     comparePlans(Optimize.execute(query.analyze), expected)
   }
+
+  test("inner join: rewrite filter predicates to conjunctive normal form") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+
+    val originalQuery = {
+      x.join(y)
+        .where(("x.b".attr === "y.b".attr)
+          && (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && 
("y.a".attr > 11)))
+    }
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation.where(('a > 3 || 'a > 1)).subquery('x)
+    val right = testRelation.where('a > 13 || 'a > 11).subquery('y)
+    val correctAnswer =
+      left.join(right, condition = Some("x.b".attr === "y.b".attr
+        && (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && 
("y.a".attr > 11))))
+        .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("inner join: rewrite join predicates to conjunctive normal form") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+
+    val originalQuery = {
+      x.join(y, condition = Some(("x.b".attr === "y.b".attr)
+        && (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && 
("y.a".attr > 11))))
+    }
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation.where('a > 3 || 'a > 1).subquery('x)
+    val right = testRelation.where('a > 13 || 'a > 11).subquery('y)
+    val correctAnswer =
+      left.join(right, condition = Some("x.b".attr === "y.b".attr
+        && (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && 
("y.a".attr > 11))))
+        .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("inner join: rewrite join predicates(with NOT predicate) to conjunctive 
normal form") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+
+    val originalQuery = {
+      x.join(y, condition = Some(("x.b".attr === "y.b".attr)
+        && Not(("x.a".attr > 3)
+        && ("x.a".attr < 2 || ("y.a".attr > 13)) || ("x.a".attr > 1) && 
("y.a".attr > 11))))
+    }
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation.where('a <= 3 || 'a >= 2).subquery('x)
+    val right = testRelation.subquery('y)
+    val correctAnswer =
+      left.join(right, condition = Some("x.b".attr === "y.b".attr
+        && (("x.a".attr <= 3) || (("x.a".attr >= 2) && ("y.a".attr <= 13)))
+        && (("x.a".attr <= 1) || ("y.a".attr <= 11))))
+        .analyze
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("left join: rewrite join predicates to conjunctive normal form") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+
+    val originalQuery = {
+      x.join(y, joinType = LeftOuter, condition = Some(("x.b".attr === 
"y.b".attr)
+        && (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && 
("y.a".attr > 11))))
+    }
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation.subquery('x)
+    val right = testRelation.where('a > 13 || 'a > 11).subquery('y)
+    val correctAnswer =
+      left.join(right, joinType = LeftOuter, condition = Some("x.b".attr === 
"y.b".attr
+        && (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && 
("y.a".attr > 11))))
+        .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("right join: rewrite join predicates to conjunctive normal form") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+
+    val originalQuery = {
+      x.join(y, joinType = RightOuter, condition = Some(("x.b".attr === 
"y.b".attr)
+        && (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && 
("y.a".attr > 11))))
+    }
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation.where('a > 3 || 'a > 1).subquery('x)
+    val right = testRelation.subquery('y)
+    val correctAnswer =
+      left.join(right, joinType = RightOuter, condition = Some("x.b".attr === 
"y.b".attr
+        && (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && 
("y.a".attr > 11))))
+        .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test(s"Disable rewrite to CNF by setting 
${SQLConf.MAX_CNF_NODE_COUNT.key}=0") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+
+    val originalQuery = {
+      x.join(y, condition = Some(("x.b".attr === "y.b".attr)
+        && ((("x.a".attr > 3) && ("x.a".attr < 13) && ("y.c".attr <= 5))
+        || (("y.a".attr > 2) && ("y.c".attr < 1)))))
+    }
+
+    Seq(0, 10).foreach { count =>
+      withSQLConf(SQLConf.MAX_CNF_NODE_COUNT.key -> count.toString) {
+        val optimized = Optimize.execute(originalQuery.analyze)
+        val (left, right) = if (count == 0) {
+          (testRelation.subquery('x), testRelation.subquery('y))
+        } else {
+          (testRelation.subquery('x),
+            testRelation.where(('c <= 5 || 'c < 1) && ('c <=5 || 'a > 
2)).subquery('y))

Review comment:
       @wangyum To make it simple, this PR didn't convert the pushed down 
predicate to a shorter form.
   We can have a follow-up PR if you like that feature.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to