Github user gengliangwang commented on a diff in the pull request:
https://github.com/apache/spark/pull/19451#discussion_r143804627
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
---
@@ -1242,6 +1243,53 @@ object ReplaceIntersectWithSemiJoin extends
Rule[LogicalPlan] {
}
/**
+ * If one or both of the datasets in the logical [[Except]] operator are
purely transformed using
+ * [[Filter]], this rule will replace logical [[Except]] operator with a
[[Filter]] operator by
+ * flipping the filter condition of the right child.
+ * {{{
+ * SELECT a1, a2 FROM Tab1 WHERE a2 = 12 EXCEPT SELECT a1, a2 FROM Tab1
WHERE a1 = 5
+ * ==> SELECT a1, a2 FROM Tab1 WHERE a2 = 12 AND a1 <> 5
+ * }}}
+ *
+ * Note:
+ * 1. We should combine all the [[Filter]] of the right node before
flipping it using NOT operator.
+ */
+object ReplaceExceptWithNotFilter extends Rule[LogicalPlan] {
+
+ implicit def nodeToFilter(node: LogicalPlan): Filter =
node.asInstanceOf[Filter]
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case Except(left, right) if isEligible(left, right) =>
+ Distinct(
+ Filter(Not(replaceAttributesIn(combineFilters(right).condition,
left)), left)
+ )
+ }
+
+ def isEligible(left: LogicalPlan, right: LogicalPlan): Boolean = (left,
right) match {
+ case (left: Filter, right: Filter) =>
parent(left).sameResult(parent(right))
+ case (left, right: Filter) => left.sameResult(parent(right))
+ case _ => false
+ }
+
+ def parent(plan: LogicalPlan): LogicalPlan = plan match {
--- End diff --
It is actually child, can you find a better function name?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]