Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/21857#discussion_r205553527
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
---
@@ -1400,13 +1401,71 @@ object ReplaceIntersectWithSemiJoin extends
Rule[LogicalPlan] {
*/
object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case Except(left, right) =>
+ case Except(left, right, false) =>
assert(left.output.size == right.output.size)
val joinCond = left.output.zip(right.output).map { case (l, r) =>
EqualNullSafe(l, r) }
Distinct(Join(left, right, LeftAnti, joinCond.reduceLeftOption(And)))
}
}
+/**
+ * Replaces logical [[ExceptAll]] operator using a combination of Union,
Aggregate
+ * and Generate operator.
+ *
+ * Input Query :
+ * {{{
+ * SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ * SELECT c1
+ * FROM (
+ * SELECT replicate_rows(sum_val, c1) AS (sum_val, c1)
+ * FROM (
+ * SELECT c1, sum_val
+ * FROM (
+ * SELECT c1, sum(vcol) AS sum_val
+ * FROM (
+ * SELECT 1L as vcol, c1 FROM ut1
+ * UNION ALL
+ * SELECT -1L as vcol, c1 FROM ut2
+ * ) AS union_all
+ * GROUP BY union_all.c1
+ * )
+ * WHERE sum_val > 0
+ * )
+ * )
+ * }}}
+ */
+
+object RewriteExcepAll extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case Except(left, right, true) =>
+ assert(left.output.size == right.output.size)
+
+ val newColumnLeft = Alias(Literal(1L), "vcol")()
+ val newColumnRight = Alias(Literal(-1L), "vcol")()
+ val modifiedLeftPlan = Project(Seq(newColumnLeft) ++ left.output,
left)
+ val modifiedRightPlan = Project(Seq(newColumnRight) ++ right.output,
right)
+ val unionPlan = Union(modifiedLeftPlan, modifiedRightPlan)
+ val aggSumCol =
+ Alias(AggregateExpression(Sum(unionPlan.output.head.toAttribute),
Complete, false), "sum")()
+ val aggOutputColumns = left.output ++ Seq(aggSumCol)
+ val aggregatePlan = Aggregate(left.output, aggOutputColumns,
unionPlan)
+ val filteredAggPlan = Filter(GreaterThan(aggSumCol.toAttribute,
Literal(0L)), aggregatePlan)
+ val genRowPlan = Generate(
+ ReplicateRows(Seq(aggSumCol.toAttribute) ++ left.output),
+ Nil,
+ false,
+ None,
--- End diff --
`qualifier = None`, ....
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]