[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21857 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205667472 --- Diff: sql/core/src/test/resources/sql-tests/inputs/except-all.sql --- @@ -0,0 +1,146 @@ +CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES +(0), (1), (2), (2), (2), (2), (3), (null), (null) AS tab1(c1); +CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES +(1), (2), (2), (3), (5), (5), (null) AS tab2(c1); +CREATE TEMPORARY VIEW tab3 AS SELECT * FROM VALUES +(1, 2), +(1, 2), +(1, 3), +(2, 3), +(2, 2) +AS tab3(k, v); +CREATE TEMPORARY VIEW tab4 AS SELECT * FROM VALUES +(1, 2), +(2, 3), +(2, 2), +(2, 2), +(2, 20) +AS tab4(k, v); + +-- Basic ExceptAll +SELECT * FROM tab1 +EXCEPT ALL +SELECT * FROM tab2; + +-- ExceptAll same table in both branches +SELECT * FROM tab1 +EXCEPT ALL +SELECT * FROM tab2 WHERE c1 IS NOT NULL; + +-- Empty left relation +SELECT * FROM tab1 WHERE c1 > 5 +EXCEPT ALL +SELECT * FROM tab2; + +-- Empty right relation +SELECT * FROM tab1 +EXCEPT ALL +SELECT * FROM tab2 WHERE c1 > 6; + +-- Type Coerced ExceptAll +SELECT * FROM tab1 +EXCEPT ALL +SELECT CAST(1 AS BIGINT); + +-- Error as types of two side are not compatible +SELECT * FROM tab1 +EXCEPT ALL +SELECT array(1); + +-- Basic +SELECT * FROM tab3 +EXCEPT ALL +SELECT * FROM tab4; + +-- Basic +SELECT * FROM tab4 +EXCEPT ALL +SELECT * FROM tab3; + +-- ExceptAll + Intersect +SELECT * FROM tab4 +EXCEPT ALL +SELECT * FROM tab3 +INTERSECT DISTINCT +SELECT * FROM tab4; + +-- ExceptAll + Except +SELECT * FROM tab4 +EXCEPT ALL +SELECT * FROM tab3 +EXCEPT DISTINCT +SELECT * FROM tab4; + +-- Chain of set operations +SELECT * FROM tab3 +EXCEPT ALL +SELECT * FROM tab4 +UNION ALL +SELECT * FROM tab3 +EXCEPT DISTINCT +SELECT * FROM tab4; + +-- Mismatch on number of columns across both branches +SELECT k FROM tab3 +EXCEPT ALL +SELECT k, v FROM tab4; + +-- Chain of set operations +SELECT * FROM tab3 +EXCEPT ALL +SELECT * FROM tab4 +UNION +SELECT * FROM tab3 +EXCEPT DISTINCT +SELECT * FROM tab4; + +-- Chain of set operations +SELECT * FROM tab3 +EXCEPT ALL +SELECT * FROM tab4 +EXCEPT DISTINCT +SELECT * FROM tab3 +EXCEPT DISTINCT +SELECT * FROM tab4; + +-- Join under except all. Should produce empty resultset since both left and right sets +-- are same. +SELECT * +FROM (SELECT tab3.k, + tab4.v +FROM tab3 + JOIN tab4 + ON tab3.k = tab4.k) +EXCEPT ALL +SELECT * +FROM (SELECT tab3.k, + tab4.v +FROM tab3 + JOIN tab4 + ON tab3.k = tab4.k); + +-- Join under except all (2) +SELECT * +FROM (SELECT tab3.k, + tab4.v +FROM tab3 + JOIN tab4 + ON tab3.k = tab4.k) +EXCEPT ALL +SELECT * +FROM (SELECT tab4.v AS k, + tab3.k AS v +FROM tab3 + JOIN tab4 + ON tab3.k = tab4.k); + +-- Group by under ExceptAll +SELECT v FROM tab3 GROUP BY v +EXCEPT ALL +SELECT k FROM tab4 GROUP BY k --- End diff -- @gatorsmile Thank you. fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205663395 --- Diff: sql/core/src/test/resources/sql-tests/inputs/except-all.sql --- @@ -0,0 +1,146 @@ +CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES +(0), (1), (2), (2), (2), (2), (3), (null), (null) AS tab1(c1); +CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES +(1), (2), (2), (3), (5), (5), (null) AS tab2(c1); +CREATE TEMPORARY VIEW tab3 AS SELECT * FROM VALUES +(1, 2), +(1, 2), +(1, 3), +(2, 3), +(2, 2) +AS tab3(k, v); +CREATE TEMPORARY VIEW tab4 AS SELECT * FROM VALUES +(1, 2), +(2, 3), +(2, 2), +(2, 2), +(2, 20) +AS tab4(k, v); + +-- Basic ExceptAll +SELECT * FROM tab1 +EXCEPT ALL +SELECT * FROM tab2; + +-- ExceptAll same table in both branches +SELECT * FROM tab1 +EXCEPT ALL +SELECT * FROM tab2 WHERE c1 IS NOT NULL; + +-- Empty left relation +SELECT * FROM tab1 WHERE c1 > 5 +EXCEPT ALL +SELECT * FROM tab2; + +-- Empty right relation +SELECT * FROM tab1 +EXCEPT ALL +SELECT * FROM tab2 WHERE c1 > 6; + +-- Type Coerced ExceptAll +SELECT * FROM tab1 +EXCEPT ALL +SELECT CAST(1 AS BIGINT); + +-- Error as types of two side are not compatible +SELECT * FROM tab1 +EXCEPT ALL +SELECT array(1); + +-- Basic +SELECT * FROM tab3 +EXCEPT ALL +SELECT * FROM tab4; + +-- Basic +SELECT * FROM tab4 +EXCEPT ALL +SELECT * FROM tab3; + +-- ExceptAll + Intersect +SELECT * FROM tab4 +EXCEPT ALL +SELECT * FROM tab3 +INTERSECT DISTINCT +SELECT * FROM tab4; + +-- ExceptAll + Except +SELECT * FROM tab4 +EXCEPT ALL +SELECT * FROM tab3 +EXCEPT DISTINCT +SELECT * FROM tab4; + +-- Chain of set operations +SELECT * FROM tab3 +EXCEPT ALL +SELECT * FROM tab4 +UNION ALL +SELECT * FROM tab3 +EXCEPT DISTINCT +SELECT * FROM tab4; + +-- Mismatch on number of columns across both branches +SELECT k FROM tab3 +EXCEPT ALL +SELECT k, v FROM tab4; + +-- Chain of set operations +SELECT * FROM tab3 +EXCEPT ALL +SELECT * FROM tab4 +UNION +SELECT * FROM tab3 +EXCEPT DISTINCT +SELECT * FROM tab4; + +-- Chain of set operations +SELECT * FROM tab3 +EXCEPT ALL +SELECT * FROM tab4 +EXCEPT DISTINCT +SELECT * FROM tab3 +EXCEPT DISTINCT +SELECT * FROM tab4; + +-- Join under except all. Should produce empty resultset since both left and right sets +-- are same. +SELECT * +FROM (SELECT tab3.k, + tab4.v +FROM tab3 + JOIN tab4 + ON tab3.k = tab4.k) +EXCEPT ALL +SELECT * +FROM (SELECT tab3.k, + tab4.v +FROM tab3 + JOIN tab4 + ON tab3.k = tab4.k); + +-- Join under except all (2) +SELECT * +FROM (SELECT tab3.k, + tab4.v +FROM tab3 + JOIN tab4 + ON tab3.k = tab4.k) +EXCEPT ALL +SELECT * +FROM (SELECT tab4.v AS k, + tab3.k AS v +FROM tab3 + JOIN tab4 + ON tab3.k = tab4.k); + +-- Group by under ExceptAll +SELECT v FROM tab3 GROUP BY v +EXCEPT ALL +SELECT k FROM tab4 GROUP BY k --- End diff -- ; --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205656881 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1400,13 +1401,71 @@ object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] { */ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { -case Except(left, right) => +case Except(left, right, false) => assert(left.output.size == right.output.size) val joinCond = left.output.zip(right.output).map { case (l, r) => EqualNullSafe(l, r) } Distinct(Join(left, right, LeftAnti, joinCond.reduceLeftOption(And))) } } +/** + * Replaces logical [[ExceptAll]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + * SELECT replicate_rows(sum_val, c1) AS (sum_val, c1) --- End diff -- Please don't forget to update the pr description as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205566658 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1400,13 +1401,71 @@ object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] { */ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { -case Except(left, right) => +case Except(left, right, false) => assert(left.output.size == right.output.size) val joinCond = left.output.zip(right.output).map { case (l, r) => EqualNullSafe(l, r) } Distinct(Join(left, right, LeftAnti, joinCond.reduceLeftOption(And))) } } +/** + * Replaces logical [[ExceptAll]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + * SELECT replicate_rows(sum_val, c1) AS (sum_val, c1) + * FROM ( + * SELECT c1, sum_val + * FROM ( + * SELECT c1, sum(vcol) AS sum_val + * FROM ( + * SELECT 1L as vcol, c1 FROM ut1 + * UNION ALL + * SELECT -1L as vcol, c1 FROM ut2 + * ) AS union_all + *GROUP BY union_all.c1 + * ) + *WHERE sum_val > 0 + * ) + * ) + * }}} + */ + +object RewriteExcepAll extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case Except(left, right, true) => + assert(left.output.size == right.output.size) + + val newColumnLeft = Alias(Literal(1L), "vcol")() + val newColumnRight = Alias(Literal(-1L), "vcol")() + val modifiedLeftPlan = Project(Seq(newColumnLeft) ++ left.output, left) + val modifiedRightPlan = Project(Seq(newColumnRight) ++ right.output, right) + val unionPlan = Union(modifiedLeftPlan, modifiedRightPlan) + val aggSumCol = +Alias(AggregateExpression(Sum(unionPlan.output.head.toAttribute), Complete, false), "sum")() + val aggOutputColumns = left.output ++ Seq(aggSumCol) + val aggregatePlan = Aggregate(left.output, aggOutputColumns, unionPlan) + val filteredAggPlan = Filter(GreaterThan(aggSumCol.toAttribute, Literal(0L)), aggregatePlan) + val genRowPlan = Generate( +ReplicateRows(Seq(aggSumCol.toAttribute) ++ left.output), +Nil, +false, +None, --- End diff -- @gatorsmile will change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205566676 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -1947,6 +1947,21 @@ class Dataset[T] private[sql]( Except(planWithBarrier, other.planWithBarrier) } + /** + * Returns a new Dataset containing rows in this Dataset but not in another Dataset while + * preserving the duplicates. + * This is equivalent to `EXCEPT ALL` in SQL. + * + * @note Equality checking is performed directly on the encoded representation of the data + * and thus is not affected by a custom `equals` function defined on `T`. + * + * @group typedrel + * @since 2.4.0 + */ + def exceptAll(other: Dataset[T]): Dataset[T] = withSetOperator { +Except(planWithBarrier, other.planWithBarrier, true) --- End diff -- @gatorsmile will change --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205566411 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -532,9 +532,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Intersect(left, right) => throw new IllegalStateException( "logical intersect operator should have been replaced by semi-join in the optimizer") - case logical.Except(left, right) => + case logical.Except(left, right, false) => throw new IllegalStateException( "logical except operator should have been replaced by anti-join in the optimizer") + case logical.Except(left, right, true) => +throw new IllegalStateException( + "logical except operator should have been replaced by union, aggregate" + --- End diff -- ok.. will change to except (all).. the difference was in the replaced operators, actually. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205566097 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1400,13 +1401,71 @@ object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] { */ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { -case Except(left, right) => +case Except(left, right, false) => assert(left.output.size == right.output.size) val joinCond = left.output.zip(right.output).map { case (l, r) => EqualNullSafe(l, r) } Distinct(Join(left, right, LeftAnti, joinCond.reduceLeftOption(And))) } } +/** + * Replaces logical [[ExceptAll]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + * SELECT replicate_rows(sum_val, c1) AS (sum_val, c1) --- End diff -- yes.. thanks.. will fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205565594 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -532,9 +532,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Intersect(left, right) => throw new IllegalStateException( "logical intersect operator should have been replaced by semi-join in the optimizer") - case logical.Except(left, right) => + case logical.Except(left, right, false) => throw new IllegalStateException( "logical except operator should have been replaced by anti-join in the optimizer") + case logical.Except(left, right, true) => +throw new IllegalStateException( + "logical except operator should have been replaced by union, aggregate" + --- End diff -- hmm, it looks no diff to above one. Maybe `except (all) operator`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205565280 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1400,13 +1401,71 @@ object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] { */ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { -case Except(left, right) => +case Except(left, right, false) => assert(left.output.size == right.output.size) val joinCond = left.output.zip(right.output).map { case (l, r) => EqualNullSafe(l, r) } Distinct(Join(left, right, LeftAnti, joinCond.reduceLeftOption(And))) } } +/** + * Replaces logical [[ExceptAll]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + * SELECT replicate_rows(sum_val, c1) AS (sum_val, c1) --- End diff -- So I think here it should be `replicate_rows(sum_val, c1) AS c1`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205563598 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -532,9 +532,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Intersect(left, right) => throw new IllegalStateException( "logical intersect operator should have been replaced by semi-join in the optimizer") - case logical.Except(left, right) => + case logical.Except(left, right, false) => throw new IllegalStateException( "logical except operator should have been replaced by anti-join in the optimizer") + case logical.Except(left, right, true) => +throw new IllegalStateException( + "logical except operator should have been replaced by union, aggregate" + --- End diff -- there is no operator called "except all " ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205563161 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1400,13 +1401,71 @@ object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] { */ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { -case Except(left, right) => +case Except(left, right, false) => assert(left.output.size == right.output.size) val joinCond = left.output.zip(right.output).map { case (l, r) => EqualNullSafe(l, r) } Distinct(Join(left, right, LeftAnti, joinCond.reduceLeftOption(And))) } } +/** + * Replaces logical [[ExceptAll]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + * SELECT replicate_rows(sum_val, c1) AS (sum_val, c1) --- End diff -- @viirya Yes it does. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205558989 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1400,13 +1401,71 @@ object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] { */ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { -case Except(left, right) => +case Except(left, right, false) => assert(left.output.size == right.output.size) val joinCond = left.output.zip(right.output).map { case (l, r) => EqualNullSafe(l, r) } Distinct(Join(left, right, LeftAnti, joinCond.reduceLeftOption(And))) } } +/** + * Replaces logical [[ExceptAll]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + * SELECT replicate_rows(sum_val, c1) AS (sum_val, c1) --- End diff -- Does `replicate_rows`'s output include `sum_val`? I think it removes the multiplier value from output? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205561228 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -532,9 +532,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Intersect(left, right) => throw new IllegalStateException( "logical intersect operator should have been replaced by semi-join in the optimizer") - case logical.Except(left, right) => + case logical.Except(left, right, false) => throw new IllegalStateException( "logical except operator should have been replaced by anti-join in the optimizer") + case logical.Except(left, right, true) => +throw new IllegalStateException( + "logical except operator should have been replaced by union, aggregate" + --- End diff -- except all --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205561601 --- Diff: python/pyspark/sql/dataframe.py --- @@ -293,6 +293,28 @@ def explain(self, extended=False): else: print(self._jdf.queryExecution().simpleString()) +@since(2.4) +def exceptAll(self, other): +"""Return a new :class:`DataFrame` containing rows in this :class:`DataFrame` but +not in another :class:`DataFrame` while preserving duplicates. + +This is equivalent to `EXCEPT ALL` in SQL. + +>>> df1 = spark.createDataFrame([("a", 1), ("a", 2), ("b", 3), ("c", 4)], ["C1", "C2"]) --- End diff -- Maybe it is better to add one more row to show the behavior of `preserving duplicates`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205560454 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -1947,6 +1947,21 @@ class Dataset[T] private[sql]( Except(planWithBarrier, other.planWithBarrier) } + /** + * Returns a new Dataset containing rows in this Dataset but not in another Dataset while + * preserving the duplicates. --- End diff -- I think it is good to mention `resolves columns by position (not by name)` here too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205554196 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -1947,6 +1947,21 @@ class Dataset[T] private[sql]( Except(planWithBarrier, other.planWithBarrier) } + /** + * Returns a new Dataset containing rows in this Dataset but not in another Dataset while + * preserving the duplicates. + * This is equivalent to `EXCEPT ALL` in SQL. + * + * @note Equality checking is performed directly on the encoded representation of the data + * and thus is not affected by a custom `equals` function defined on `T`. + * + * @group typedrel + * @since 2.4.0 + */ + def exceptAll(other: Dataset[T]): Dataset[T] = withSetOperator { +Except(planWithBarrier, other.planWithBarrier, true) --- End diff -- isAll = true --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205554069 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1400,13 +1401,71 @@ object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] { */ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { -case Except(left, right) => +case Except(left, right, false) => assert(left.output.size == right.output.size) val joinCond = left.output.zip(right.output).map { case (l, r) => EqualNullSafe(l, r) } Distinct(Join(left, right, LeftAnti, joinCond.reduceLeftOption(And))) } } +/** + * Replaces logical [[ExceptAll]] operator using a combination of Union, Aggregate --- End diff -- Ah.. sorry.. will change it to Except. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205553527 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1400,13 +1401,71 @@ object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] { */ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { -case Except(left, right) => +case Except(left, right, false) => assert(left.output.size == right.output.size) val joinCond = left.output.zip(right.output).map { case (l, r) => EqualNullSafe(l, r) } Distinct(Join(left, right, LeftAnti, joinCond.reduceLeftOption(And))) } } +/** + * Replaces logical [[ExceptAll]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + * SELECT replicate_rows(sum_val, c1) AS (sum_val, c1) + * FROM ( + * SELECT c1, sum_val + * FROM ( + * SELECT c1, sum(vcol) AS sum_val + * FROM ( + * SELECT 1L as vcol, c1 FROM ut1 + * UNION ALL + * SELECT -1L as vcol, c1 FROM ut2 + * ) AS union_all + *GROUP BY union_all.c1 + * ) + *WHERE sum_val > 0 + * ) + * ) + * }}} + */ + +object RewriteExcepAll extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case Except(left, right, true) => + assert(left.output.size == right.output.size) + + val newColumnLeft = Alias(Literal(1L), "vcol")() + val newColumnRight = Alias(Literal(-1L), "vcol")() + val modifiedLeftPlan = Project(Seq(newColumnLeft) ++ left.output, left) + val modifiedRightPlan = Project(Seq(newColumnRight) ++ right.output, right) + val unionPlan = Union(modifiedLeftPlan, modifiedRightPlan) + val aggSumCol = +Alias(AggregateExpression(Sum(unionPlan.output.head.toAttribute), Complete, false), "sum")() + val aggOutputColumns = left.output ++ Seq(aggSumCol) + val aggregatePlan = Aggregate(left.output, aggOutputColumns, unionPlan) + val filteredAggPlan = Filter(GreaterThan(aggSumCol.toAttribute, Literal(0L)), aggregatePlan) + val genRowPlan = Generate( +ReplicateRows(Seq(aggSumCol.toAttribute) ++ left.output), +Nil, +false, +None, --- End diff -- `qualifier = None`, --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205552895 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1400,13 +1401,71 @@ object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] { */ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { -case Except(left, right) => +case Except(left, right, false) => assert(left.output.size == right.output.size) val joinCond = left.output.zip(right.output).map { case (l, r) => EqualNullSafe(l, r) } Distinct(Join(left, right, LeftAnti, joinCond.reduceLeftOption(And))) } } +/** + * Replaces logical [[ExceptAll]] operator using a combination of Union, Aggregate --- End diff -- ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205359199 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1275,6 +1276,64 @@ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { } } +/** + * Replaces logical [[ExceptAll]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + * SELECT replicate_rows(sum_val, c1) AS (sum_val, c1) + * FROM ( + * SELECT c1, cnt, sum_val --- End diff -- @ueshin Done. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205357936 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1275,6 +1276,64 @@ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { } } +/** + * Replaces logical [[ExceptAll]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + * SELECT replicate_rows(sum_val, c1) AS (sum_val, c1) + * FROM ( + * SELECT c1, cnt, sum_val --- End diff -- We can remove it from the pr description as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205352189 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -52,7 +52,7 @@ trait CheckAnalysis extends PredicateHelper { } protected def mapColumnInSetOperation(plan: LogicalPlan): Option[Attribute] = plan match { -case _: Intersect | _: Except | _: Distinct => +case _: Intersect | _: ExceptBase | _: Distinct => --- End diff -- @gatorsmile @maropu I have removed ExceptAll operator. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205331401 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -52,7 +52,7 @@ trait CheckAnalysis extends PredicateHelper { } protected def mapColumnInSetOperation(plan: LogicalPlan): Option[Attribute] = plan match { -case _: Intersect | _: Except | _: Distinct => +case _: Intersect | _: ExceptBase | _: Distinct => --- End diff -- @gatorsmile @maropu OK --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205325587 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -52,7 +52,7 @@ trait CheckAnalysis extends PredicateHelper { } protected def mapColumnInSetOperation(plan: LogicalPlan): Option[Attribute] = plan match { -case _: Intersect | _: Except | _: Distinct => +case _: Intersect | _: ExceptBase | _: Distinct => --- End diff -- I am fine about that. Please make a change and avoid introducing a new LogicalPlan node. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205289050 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1275,6 +1276,64 @@ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { } } +/** + * Replaces logical [[ExceptAll]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + * SELECT replicate_rows(sum_val, c1) AS (sum_val, c1) + * FROM ( + * SELECT c1, cnt, sum_val --- End diff -- @ueshin Thanks.. will change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204977789 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1275,6 +1276,64 @@ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { } } +/** + * Replaces logical [[ExceptAll]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + * SELECT replicate_rows(sum_val, c1) AS (sum_val, c1) + * FROM ( + * SELECT c1, cnt, sum_val --- End diff -- nit: there is no `cnt`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204891972 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1275,6 +1276,64 @@ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { } } +/** + * Replaces logical [[ExceptAll]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + * SELECT replicate_rows(sum_val, c1) AS (sum_val, c1) + * FROM ( + * SELECT c1, cnt, sum_val + * FROM ( + * SELECT c1, sum(vcol) AS sum_val + * FROM ( + * SELECT 1L as vcol, c1 FROM ut1 + * UNION ALL + * SELECT -1L as vcol, c1 FROM ut2 + * ) AS union_all + *GROUP BY union_all.c1 + * ) + *WHERE sum_val > 0 + * ) + * ) + * }}} + */ + +object RewriteExcepAll extends Rule[LogicalPlan] { --- End diff -- @maropu I have added a unit test to check the plan. Please look at it when you get a chance. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r20408 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -52,7 +52,7 @@ trait CheckAnalysis extends PredicateHelper { } protected def mapColumnInSetOperation(plan: LogicalPlan): Option[Attribute] = plan match { -case _: Intersect | _: Except | _: Distinct => +case _: Intersect | _: ExceptBase | _: Distinct => --- End diff -- @gatorsmile Thats right Sean. We will not need changes here. However may i request you to please command-B on Except class ? We may need to change the pattern matching in other places, right ? Just wanted to make sure you are okay with it before i went ahead and made the changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204887497 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -52,7 +52,7 @@ trait CheckAnalysis extends PredicateHelper { } protected def mapColumnInSetOperation(plan: LogicalPlan): Option[Attribute] = plan match { -case _: Intersect | _: Except | _: Distinct => +case _: Intersect | _: ExceptBase | _: Distinct => --- End diff -- then, we do not need these changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204887309 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -182,14 +182,16 @@ case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation } } -case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { - +abstract class ExceptBase(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { /** We don't use right.output because those rows get excluded from the set. */ override def output: Seq[Attribute] = left.output override protected def validConstraints: Set[Expression] = leftConstraints } +case class Except(left: LogicalPlan, right: LogicalPlan) extends ExceptBase(left, right) +case class ExceptAll(left: LogicalPlan, right: LogicalPlan) extends ExceptBase(left, right) --- End diff -- Let us avoid adding a new logical plan node. : ) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204886904 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -1919,6 +1919,21 @@ class Dataset[T] private[sql]( Except(planWithBarrier, other.planWithBarrier) } + /** + * Returns a new Dataset containing rows in this Dataset but not in another Dataset while + * preserving the duplicates. + * This is equivalent to `EXCEPT ALL` in SQL. + * + * @note Equality checking is performed directly on the encoded representation of the data + * and thus is not affected by a custom `equals` function defined on `T`. + * + * @group typedrel + * @since 2.4.0 --- End diff -- We are still targeting it to 2.4. If we are unable to make it, we can change it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204777353 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -182,14 +182,16 @@ case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation } } -case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { - +abstract class ExceptBase(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { /** We don't use right.output because those rows get excluded from the set. */ override def output: Seq[Attribute] = left.output override protected def validConstraints: Set[Expression] = leftConstraints } +case class Except(left: LogicalPlan, right: LogicalPlan) extends ExceptBase(left, right) +case class ExceptAll(left: LogicalPlan, right: LogicalPlan) extends ExceptBase(left, right) --- End diff -- @maropu Some details to aid the decision making. I remember now.. This way, i had to change less number of files. I just looked at the usage of Except to double check. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204775684 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala --- @@ -222,6 +222,32 @@ case class Stack(children: Seq[Expression]) extends Generator { } } +/** + * Replicate the row N times. N is specified as the first argument to the function. + * This is a internal function solely used by optimizer to rewrite EXCEPT ALL AND + * INTERSECT ALL queries. + */ +case class ReplicateRows(children: Seq[Expression]) extends Generator with CodegenFallback { --- End diff -- ah, ok. It sounds ok to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204775243 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1275,6 +1276,64 @@ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { } } +/** + * Replaces logical [[ExceptAll]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + * SELECT replicate_rows(sum_val, c1) AS (sum_val, c1) + * FROM ( + * SELECT c1, cnt, sum_val + * FROM ( + * SELECT c1, sum(vcol) AS sum_val + * FROM ( + * SELECT 1L as vcol, c1 FROM ut1 + * UNION ALL + * SELECT -1L as vcol, c1 FROM ut2 + * ) AS union_all + *GROUP BY union_all.c1 + * ) + *WHERE sum_val > 0 + * ) + * ) + * }}} + */ + +object RewriteExcepAll extends Rule[LogicalPlan] { --- End diff -- Ah.. ok... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204774985 --- Diff: sql/core/src/test/resources/sql-tests/inputs/except-all.sql --- @@ -0,0 +1,146 @@ +CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES +(0),(1),(2),(2),(2),(2),(3),(null),(null) AS tab1(c1) ; --- End diff -- @maropu Will do. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204774728 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala --- @@ -222,6 +222,32 @@ case class Stack(children: Seq[Expression]) extends Generator { } } +/** + * Replicate the row N times. N is specified as the first argument to the function. + * This is a internal function solely used by optimizer to rewrite EXCEPT ALL AND + * INTERSECT ALL queries. + */ +case class ReplicateRows(children: Seq[Expression]) extends Generator with CodegenFallback { --- End diff -- @maropu I would like to take this in a follow up. I think we have codegen disabled for generators in general. So we will not be able to take advantage of it ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204773666 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala --- @@ -222,6 +222,32 @@ case class Stack(children: Seq[Expression]) extends Generator { } } +/** + * Replicate the row N times. N is specified as the first argument to the function. + * This is a internal function solely used by optimizer to rewrite EXCEPT ALL AND + * INTERSECT ALL queries. + */ +case class ReplicateRows(children: Seq[Expression]) extends Generator with CodegenFallback { + private lazy val numColumns = children.length - 1 // remove the multiplier value from output. + + override def elementSchema: StructType = +StructType(children.tail.zipWithIndex.map { + case (e, index) => StructField(s"col$index", e.dataType) + }) --- End diff -- @maropu will check and fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204773452 --- Diff: sql/core/src/test/resources/sql-tests/inputs/except-all.sql --- @@ -0,0 +1,146 @@ +CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES --- End diff -- @maropu I thought we like to keep these sql files relatively small and not contain too many sqls. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204773466 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -182,14 +182,16 @@ case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation } } -case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { - +abstract class ExceptBase(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { /** We don't use right.output because those rows get excluded from the set. */ override def output: Seq[Attribute] = left.output override protected def validConstraints: Set[Expression] = leftConstraints } +case class Except(left: LogicalPlan, right: LogicalPlan) extends ExceptBase(left, right) +case class ExceptAll(left: LogicalPlan, right: LogicalPlan) extends ExceptBase(left, right) --- End diff -- ok cc: @gatorsmile @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204772193 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -182,14 +182,16 @@ case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation } } -case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { - +abstract class ExceptBase(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { /** We don't use right.output because those rows get excluded from the set. */ override def output: Seq[Attribute] = left.output override protected def validConstraints: Set[Expression] = leftConstraints } +case class Except(left: LogicalPlan, right: LogicalPlan) extends ExceptBase(left, right) +case class ExceptAll(left: LogicalPlan, right: LogicalPlan) extends ExceptBase(left, right) --- End diff -- @maropu Right. So this way , most of the pattern matching happens on the the Base class where things are common. I went back and forth on this as well.. If there is a consensus i will change it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204763786 --- Diff: sql/core/src/test/resources/sql-tests/inputs/except-all.sql --- @@ -0,0 +1,146 @@ +CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES +(0),(1),(2),(2),(2),(2),(3),(null),(null) AS tab1(c1) ; --- End diff -- super nit: need spaces like `(0), (1), (2), (3), ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204760604 --- Diff: sql/core/src/test/resources/sql-tests/inputs/except-all.sql --- @@ -0,0 +1,146 @@ +CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES --- End diff -- I feel it's ok to move these tests to `except.sql`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204760405 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -182,14 +182,16 @@ case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation } } -case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { - +abstract class ExceptBase(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { /** We don't use right.output because those rows get excluded from the set. */ override def output: Seq[Attribute] = left.output override protected def validConstraints: Set[Expression] = leftConstraints } +case class Except(left: LogicalPlan, right: LogicalPlan) extends ExceptBase(left, right) +case class ExceptAll(left: LogicalPlan, right: LogicalPlan) extends ExceptBase(left, right) --- End diff -- We need a logical node for `ExceptAll`? As another option, we can add a flag as a field value of `Except` (`case class Except(left: LogicalPlan, right: LogicalPlan, all: Boolean)`? That's because `Except` and `ExceptAll` is not different for the analyzer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204758133 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -1919,6 +1919,21 @@ class Dataset[T] private[sql]( Except(planWithBarrier, other.planWithBarrier) } + /** + * Returns a new Dataset containing rows in this Dataset but not in another Dataset while + * preserving the duplicates. + * This is equivalent to `EXCEPT ALL` in SQL. + * + * @note Equality checking is performed directly on the encoded representation of the data + * and thus is not affected by a custom `equals` function defined on `T`. + * + * @group typedrel + * @since 2.4.0 --- End diff -- The feature freeze will come soon, so I'm not sure this will appear in v2.4. cc: @gatorsmile @rxin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204762822 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala --- @@ -222,6 +222,32 @@ case class Stack(children: Seq[Expression]) extends Generator { } } +/** + * Replicate the row N times. N is specified as the first argument to the function. + * This is a internal function solely used by optimizer to rewrite EXCEPT ALL AND + * INTERSECT ALL queries. + */ +case class ReplicateRows(children: Seq[Expression]) extends Generator with CodegenFallback { --- End diff -- We don't support codegen? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204761432 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala --- @@ -222,6 +222,32 @@ case class Stack(children: Seq[Expression]) extends Generator { } } +/** + * Replicate the row N times. N is specified as the first argument to the function. + * This is a internal function solely used by optimizer to rewrite EXCEPT ALL AND + * INTERSECT ALL queries. + */ +case class ReplicateRows(children: Seq[Expression]) extends Generator with CodegenFallback { + private lazy val numColumns = children.length - 1 // remove the multiplier value from output. + + override def elementSchema: StructType = +StructType(children.tail.zipWithIndex.map { + case (e, index) => StructField(s"col$index", e.dataType) + }) --- End diff -- nit: need indents --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204763415 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1275,6 +1276,64 @@ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { } } +/** + * Replaces logical [[ExceptAll]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + * SELECT replicate_rows(sum_val, c1) AS (sum_val, c1) + * FROM ( + * SELECT c1, cnt, sum_val + * FROM ( + * SELECT c1, sum(vcol) AS sum_val + * FROM ( + * SELECT 1L as vcol, c1 FROM ut1 + * UNION ALL + * SELECT -1L as vcol, c1 FROM ut2 + * ) AS union_all + *GROUP BY union_all.c1 + * ) + *WHERE sum_val > 0 + * ) + * ) + * }}} + */ + +object RewriteExcepAll extends Rule[LogicalPlan] { --- End diff -- We need tests for the `RewriteExceptAll rule (you can check `RewriteDistinctAggregatesSuite` as a reference). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204750587 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala --- @@ -222,6 +222,32 @@ case class Stack(children: Seq[Expression]) extends Generator { } } +/** + * Replicate the row N times. N is specified as the first argument to the function. + * This is a internal function solely used by optimizer to rewrite EXCEPT ALL AND --- End diff -- nit: `a internal` -> `an internal` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204743359 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala --- @@ -222,6 +222,32 @@ case class Stack(children: Seq[Expression]) extends Generator { } } +/** + * Replicate the row N times. N is specified as the first argument to the function. + * This is a internal function solely used by optimizer to rewrite EXCEPT ALL AND + * INTERSECT ALL queries. + */ +case class ReplicateRows(children: Seq[Expression]) extends Generator with CodegenFallback { + private lazy val numColumns = children.length - 1 // remove the multiplier value from output. + + override def elementSchema: StructType = +StructType(children.tail.zipWithIndex.map { + case (e, index) => StructField(s"col$index", e.dataType) + }) + + override def eval(input: InternalRow): TraversableOnce[InternalRow] = { +val numRows = children.head.eval(input).asInstanceOf[Long] +val values = children.tail.map(_.eval(input)).toArray +Range.Long(0, numRows, 1).map { i => --- End diff -- nit: `i` -> `_` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204679789 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala --- @@ -222,6 +222,37 @@ case class Stack(children: Seq[Expression]) extends Generator { } } +/** + * Replicate the row N times. N is specified as the first argument to the function. + * This is a internal function solely used by optimizer to rewrite EXCEPT ALL AND + * INTERSECT ALL queries. + */ +@ExpressionDescription( --- End diff -- @HyukjinKwon OK.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21857: [SPARK-21274][SQL] Implement EXCEPT ALL clause.
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r204677899 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala --- @@ -222,6 +222,37 @@ case class Stack(children: Seq[Expression]) extends Generator { } } +/** + * Replicate the row N times. N is specified as the first argument to the function. + * This is a internal function solely used by optimizer to rewrite EXCEPT ALL AND + * INTERSECT ALL queries. + */ +@ExpressionDescription( --- End diff -- If it's for an internal purpose, you can just remove this though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org