cloud-fan commented on a change in pull request #28733:
URL: https://github.com/apache/spark/pull/28733#discussion_r437882318



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
##########
@@ -198,6 +200,90 @@ trait PredicateHelper {
     case e: Unevaluable => false
     case e => e.children.forall(canEvaluateWithinJoin)
   }
+
+  /**
+   * Convert an expression into conjunctive normal form.
+   * Definition and algorithm: 
https://en.wikipedia.org/wiki/Conjunctive_normal_form
+   * CNF can explode exponentially in the size of the input expression when 
converting Or clauses.
+   * Use a configuration MAX_CNF_NODE_COUNT to prevent such cases.
+   *
+   * @param condition to be conversed into CNF.
+   * @return If the number of expressions exceeds threshold on converting Or, 
return Seq.empty.
+   *         If the conversion repeatedly expands nondeterministic 
expressions, return Seq.empty.
+   *         Otherwise, return the converted result as sequence of disjunctive 
expressions.
+   */
+  def conjunctiveNormalForm(condition: Expression): Seq[Expression] = {
+    val postOrderNodes = postOrderTraversal(condition)
+    val resultStack = new mutable.Stack[Seq[Expression]]
+    val maxCnfNodeCount = SQLConf.get.maxCnfNodeCount
+    // Bottom up approach to get CNF of sub-expressions
+    while (postOrderNodes.nonEmpty) {
+      val cnf = postOrderNodes.pop() match {
+        case _: And =>
+          val right: Seq[Expression] = resultStack.pop()
+          val left: Seq[Expression] = resultStack.pop()
+          left ++ right
+        case _: Or =>
+          // For each side, there is no need to expand predicates of the same 
references.
+          // So here we can aggregate predicates of the same references as one 
single predicate,
+          // for reducing the size of pushed down predicates and corresponding 
codegen.
+          val right = aggregateExpressionsOfSameQualifiers(resultStack.pop())
+          val left = aggregateExpressionsOfSameQualifiers(resultStack.pop())
+          // Stop the loop whenever the result exceeds the `maxCnfNodeCount`
+          if (left.size * right.size > maxCnfNodeCount) {
+            Seq.empty
+          } else {
+            for {x <- left; y <- right} yield Or(x, y)
+          }
+        case other => other :: Nil
+      }
+      if (cnf.isEmpty) {
+        return Seq.empty
+      }
+      if (resultStack.length != 1) {
+        logWarning("The length of CNF conversion result stack is supposed to 
be 1. There might " +

Review comment:
       shall we return Nil from here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to