[jira] [Created] (SPARK-29060) Add tree traversal helper for adaptive spark plans
Maryann Xue created SPARK-29060: --- Summary: Add tree traversal helper for adaptive spark plans Key: SPARK-29060 URL: https://issues.apache.org/jira/browse/SPARK-29060 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Maryann Xue -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-29002) Avoid changing SMJ to BHJ if the build side has a high ratio of empty partitions
Maryann Xue created SPARK-29002: --- Summary: Avoid changing SMJ to BHJ if the build side has a high ratio of empty partitions Key: SPARK-29002 URL: https://issues.apache.org/jira/browse/SPARK-29002 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Maryann Xue -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-28959) Support `EqualNullSafe` as join condition in Dynamic Partition Pruning
Maryann Xue created SPARK-28959: --- Summary: Support `EqualNullSafe` as join condition in Dynamic Partition Pruning Key: SPARK-28959 URL: https://issues.apache.org/jira/browse/SPARK-28959 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Maryann Xue See [https://github.com/apache/spark/pull/25600#discussion_r318415489]. -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-28888) Implement Dynamic Partition Pruning
Maryann Xue created SPARK-2: --- Summary: Implement Dynamic Partition Pruning Key: SPARK-2 URL: https://issues.apache.org/jira/browse/SPARK-2 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 3.0.0 Reporter: Maryann Xue -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-28753) Dynamically reuse subqueries in AQE
Maryann Xue created SPARK-28753: --- Summary: Dynamically reuse subqueries in AQE Key: SPARK-28753 URL: https://issues.apache.org/jira/browse/SPARK-28753 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Maryann Xue -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-28583) Subqueries should not call `onUpdatePlan` in Adaptive Query Execution
Maryann Xue created SPARK-28583: --- Summary: Subqueries should not call `onUpdatePlan` in Adaptive Query Execution Key: SPARK-28583 URL: https://issues.apache.org/jira/browse/SPARK-28583 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.0 Reporter: Maryann Xue Subqueries do not have their own execution id, thus when calling {{AdaptiveSparkPlanExec.onUpdatePlan}}, it will actually get the {{QueryExecution}} instance of the main query, which is wasteful and problematic. It could cause issues like stack overflow or dead locks in some circumstances. -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-28057) Add method `clone` in catalyst TreeNode
Maryann Xue created SPARK-28057: --- Summary: Add method `clone` in catalyst TreeNode Key: SPARK-28057 URL: https://issues.apache.org/jira/browse/SPARK-28057 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Maryann Xue Add implementation for {{clone}} method in {{TreeNode}}, for de-duplicating instances in the LogicalPlan tree. This is a prerequisite for SPARK-23128. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27824) Make rule EliminateResolvedHint idempotent
Maryann Xue created SPARK-27824: --- Summary: Make rule EliminateResolvedHint idempotent Key: SPARK-27824 URL: https://issues.apache.org/jira/browse/SPARK-27824 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.0 Reporter: Maryann Xue This is make sure we get the correct outcome even if the {{EliminateResolveHint}} rule gets applied more than once. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27783) Add customizable hint error handler
Maryann Xue created SPARK-27783: --- Summary: Add customizable hint error handler Key: SPARK-27783 URL: https://issues.apache.org/jira/browse/SPARK-27783 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Maryann Xue Add a customizable hint error handler, with default behavior as logging warnings. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27236) Refactor log-appender pattern in tests
Maryann Xue created SPARK-27236: --- Summary: Refactor log-appender pattern in tests Key: SPARK-27236 URL: https://issues.apache.org/jira/browse/SPARK-27236 Project: Spark Issue Type: Improvement Components: Tests Affects Versions: 2.4.0 Reporter: Maryann Xue There are a few places in tests where similar patterns of "withLogAppender()" are used. This seems a common requirement across tests, so can be made into a utility method. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27225) Implement join strategy hints
Maryann Xue created SPARK-27225: --- Summary: Implement join strategy hints Key: SPARK-27225 URL: https://issues.apache.org/jira/browse/SPARK-27225 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 2.4.0 Reporter: Maryann Xue Extend the existing BROADCAST join hint by implementing other join strategy hints corresponding to the rest of Spark's existing join strategies: shuffle-hash, sort-merge, cartesian-product. Broadcast-nested-loop will use BROADCAST hint as it does now. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27223) Remove private methods that skip conversion when passing user schemas for constructing a DataFrame
[ https://issues.apache.org/jira/browse/SPARK-27223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-27223: Summary: Remove private methods that skip conversion when passing user schemas for constructing a DataFrame (was: Remove private methods that allow no conversion when passing user schemas for constructing a DataFrame) > Remove private methods that skip conversion when passing user schemas for > constructing a DataFrame > -- > > Key: SPARK-27223 > URL: https://issues.apache.org/jira/browse/SPARK-27223 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Maryann Xue >Priority: Minor > > When passing in a user schema to create a DataFrame, there might be > mismatched nullability between the user schema and the the actual data. All > related public interfaces now perform catalyst conversion using the user > provided schema, which catches such mismatches to avoid runtime errors later > on. However, there're private methods which allow this conversion to be > skipped, so we need to remove these private methods which may lead to > confusion and potential issues. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27223) Remove private methods that allow no conversion when passing user schemas for constructing a DataFrame
Maryann Xue created SPARK-27223: --- Summary: Remove private methods that allow no conversion when passing user schemas for constructing a DataFrame Key: SPARK-27223 URL: https://issues.apache.org/jira/browse/SPARK-27223 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.0 Reporter: Maryann Xue When passing in a user schema to create a DataFrame, there might be mismatched nullability between the user schema and the the actual data. All related public interfaces now perform catalyst conversion using the user provided schema, which catches such mismatches to avoid runtime errors later on. However, there're private methods which allow this conversion to be skipped, so we need to remove these private methods which may lead to confusion and potential issues. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27088) Apply conf "spark.sql.optimizer.planChangeLog.level" to batch plan change in RuleExecutor
Maryann Xue created SPARK-27088: --- Summary: Apply conf "spark.sql.optimizer.planChangeLog.level" to batch plan change in RuleExecutor Key: SPARK-27088 URL: https://issues.apache.org/jira/browse/SPARK-27088 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Maryann Xue Similar to SPARK-25415, which has made log level for plan changes by each rule configurable, we can make log level for plan changes by each batch configurable too and can reuse the same configuration: "spark.sql.optimizer.planChangeLog.level". -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26840) Avoid cost-based join reorder in presence of join hints
[ https://issues.apache.org/jira/browse/SPARK-26840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-26840: Description: This is a fix for [https://github.com/apache/spark/pull/23524|https://github.com/apache/spark/pull/23524.], which did not stop cost-based join reorder when the {{CostBasedJoinReorder}} rule recurses down the tree and applies join reorder for nested joins with hints. > Avoid cost-based join reorder in presence of join hints > --- > > Key: SPARK-26840 > URL: https://issues.apache.org/jira/browse/SPARK-26840 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Maryann Xue >Priority: Minor > > This is a fix for > [https://github.com/apache/spark/pull/23524|https://github.com/apache/spark/pull/23524.], > which did not stop cost-based join reorder when the {{CostBasedJoinReorder}} > rule recurses down the tree and applies join reorder for nested joins with > hints. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26840) Avoid cost-based join reorder in presence of join hints
Maryann Xue created SPARK-26840: --- Summary: Avoid cost-based join reorder in presence of join hints Key: SPARK-26840 URL: https://issues.apache.org/jira/browse/SPARK-26840 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.0 Reporter: Maryann Xue -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26065) Change query hint from a `LogicalPlan` to a field
Maryann Xue created SPARK-26065: --- Summary: Change query hint from a `LogicalPlan` to a field Key: SPARK-26065 URL: https://issues.apache.org/jira/browse/SPARK-26065 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: Maryann Xue The existing query hint implementation relies on a logical plan node {{ResolvedHint}} to store query hints in logical plans, and on {{Statistics}} in physical plans. Since {{ResolvedHint}} is not really a logical operator and can break the pattern matching for existing and future optimization rules, it is a issue to the Optimizer as the old {{AnalysisBarrier}} to the Analyzer. Given the fact that all our query hints are either 1) a join hint, i.e., broadcast hint; or 2) a re-partition hint, which is indeed an operator, we only need to add a hint field on the {{Join}} plan and that will be a good enough solution for current hint usage. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25914) Separate projection from grouping and aggregate in logical Aggregate
[ https://issues.apache.org/jira/browse/SPARK-25914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-25914: Description: Currently the Spark SQL logical Aggregate has two expression fields: {{groupingExpressions}} and {{aggregateExpressions}}, in which {{aggregateExpressions}} is actually the result expressions, or in other words, the project list in the SELECT clause. This would cause an exception while processing the following query: {code:java} SELECT concat('x', concat(a, 's')) FROM testData2 GROUP BY concat(a, 's'){code} After optimization, the query becomes: {code:java} SELECT concat('x', a, 's') FROM testData2 GROUP BY concat(a, 's'){code} The optimization rule {{CombineConcats}} optimizes the expressions by flattening "concat" and causes the query to fail since the expression {{concat('x', a, 's')}} in the SELECT clause is neither referencing a grouping expression nor a aggregate expression. The problem is that we try to mix two operations in one operator, and worse, in one field: the group-and-aggregate operation and the project operation. There are two ways to solve this problem: 1. Break the two operations into two logical operators, which means a group-by query can usually be mapped into a Project-over-Aggregate pattern. 2. Break the two operations into multiple fields in the Aggregate operator, the same way we do for physical aggregate classes (e.g., {{HashAggregateExec}}, or {{SortAggregateExec}}). Thus, {{groupingExpressions}} would still be the expressions from the GROUP BY clause (as before), but {{aggregateExpressions}} would contain aggregate functions only, and {{resultExpressions}} would be the project list in the SELECT clause holding references to either {{groupingExpressions}} or {{aggregateExpressions}}. I would say option 1 is even clearer, but it would be more likely to break the pattern matching in existing optimization rules and thus require more changes in the compiler. So we'd probably wanna go with option 2. That said, I suggest we achieve this goal through two iterative steps: Phase 1: Keep the current fields of logical Aggregate as {{groupingExpressions}} and {{aggregateExpressions}}, but change the semantics of {{aggregateExpressions}} by replacing the grouping expressions with corresponding references to expressions in {{groupingExpressions}}. The aggregate expressions in {{aggregateExpressions}} will remain the same. Phase 2: Add {{resultExpressions}} for the project list, and keep only aggregate expressions in {{aggregateExpressions}}. was: Currently the Spark SQL logical Aggregate has two expression fields: {{groupingExpressions}} and {{aggregateExpressions}}, in which {{aggregateExpressions}} is actually the result expressions, or in other words, the project list in the SELECT clause. This would cause an exception while processing the following query: {code:java} SELECT concat('x', concat(a, 's')) FROM testData2 GROUP BY concat(a, 's'){code} After optimization, the query becomes: {code:java} SELECT concat('x', a, 's') FROM testData2 GROUP BY concat(a, 's'){code} The optimization rule {{CombineConcats}} optimizes the expressions by flattening "concat" and causes the query to fail since the expression {{concat('x', a, 's')}} in the SELECT clause is neither referencing a grouping expression nor a aggregate expression. The problem is that we try to mix two operations in one operator, and worse, in one field: the group-and-aggregate operation and the project operation. There are two ways to solve this problem: 1. Break the two operations into two logical operators, which means a group-by query can usually to be mapped into a Project-over-Aggregate pattern. 2. Break the two operations into multiple fields in the Aggregate operator, the same way we do for physical aggregate classes (e.g., {{HashAggregateExec}}, or {{SortAggregateExec}}). Thus, {{groupingExpressions}} would still be the expressions from the GROUP BY clause (as before), but {{aggregateExpressions}} would contain aggregate functions only, and {{resultExpressions}} would be the project list in the SELECT clause holding references to either {{groupingExpressions}} or {{aggregateExpressions}}. I would say option 1 is even clearer, but it would be more likely to break the pattern matching in existing optimization rules and thus require more changes in the compiler. So we'd probably wanna go with option 2. That said, I suggest we achieve this goal through two iterative steps: Phase 1: Keep the current fields of logical Aggregate as {{groupingExpressions}} and {{aggregateExpressions}}, but change the semantics of {{aggregateExpressions}} by replacing the grouping expressions with corresponding references to expressions in {{groupingExpressions}}. The aggregate expressions in {{aggregateExpressions}} will remain the same. Phase
[jira] [Created] (SPARK-25916) Add `resultExpressions` in logical `Aggregate`
Maryann Xue created SPARK-25916: --- Summary: Add `resultExpressions` in logical `Aggregate` Key: SPARK-25916 URL: https://issues.apache.org/jira/browse/SPARK-25916 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.4.0 Reporter: Maryann Xue See parent Jira description -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25915) Replace grouping expressions with references in `aggregateExpressions` of logical `Aggregate`
Maryann Xue created SPARK-25915: --- Summary: Replace grouping expressions with references in `aggregateExpressions` of logical `Aggregate` Key: SPARK-25915 URL: https://issues.apache.org/jira/browse/SPARK-25915 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.4.0 Reporter: Maryann Xue See parent Jira description. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25914) Separate projection from grouping and aggregate in logical Aggregate
[ https://issues.apache.org/jira/browse/SPARK-25914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-25914: Description: Currently the Spark SQL logical Aggregate has two expression fields: {{groupingExpressions}} and {{aggregateExpressions}}, in which {{aggregateExpressions}} is actually the result expressions, or in other words, the project list in the SELECT clause. This would cause an exception while processing the following query: {code:java} SELECT concat('x', concat(a, 's')) FROM testData2 GROUP BY concat(a, 's'){code} After optimization, the query becomes: {code:java} SELECT concat('x', a, 's') FROM testData2 GROUP BY concat(a, 's'){code} The optimization rule {{CombineConcats}} optimizes the expressions by flattening "concat" and causes the query to fail since the expression {{concat('x', a, 's')}} in the SELECT clause is neither referencing a grouping expression nor a aggregate expression. The problem is that we try to mix two operations in one operator, and worse, in one field: the group-and-aggregate operation and the project operation. There are two ways to solve this problem: 1. Break the two operations into two logical operators, which means a group-by query can usually to be mapped into a Project-over-Aggregate pattern. 2. Break the two operations into multiple fields in the Aggregate operator, the same way we do for physical aggregate classes (e.g., {{HashAggregateExec}}, or {{SortAggregateExec}}). Thus, {{groupingExpressions}} would still be the expressions from the GROUP BY clause (as before), but {{aggregateExpressions}} would contain aggregate functions only, and {{resultExpressions}} would be the project list in the SELECT clause holding references to either {{groupingExpressions}} or {{aggregateExpressions}}. I would say option 1 is even clearer, but it would be more likely to break the pattern matching in existing optimization rules and thus require more changes in the compiler. So we'd probably wanna go with option 2. That said, I suggest we achieve this goal through two iterative steps: Phase 1: Keep the current fields of logical Aggregate as {{groupingExpressions}} and {{aggregateExpressions}}, but change the semantics of {{aggregateExpressions}} by replacing the grouping expressions with corresponding references to expressions in {{groupingExpressions}}. The aggregate expressions in {{aggregateExpressions}} will remain the same. Phase 2: Add {{resultExpressions}} for the project list, and keep only aggregate expressions in {{aggregateExpressions}}. was: Currently the Spark SQL logical Aggregate has two expression fields: {{groupingExpressions}} and {{aggregateExpressions}}, in which {{aggregateExpressions}} is actually the result expressions, or in other words, the project list in the SELECT clause. This would cause an exception while processing the following query: SELECT concat('x', concat(a, 's')) FROM testData2 GROUP BY concat(a, 's') After optimization, the query becomes: SELECT concat('x', a, 's') FROM testData2 GROUP BY concat(a, 's') The optimization rule {{CombineConcats}} optimizes the expressions by flattening "concat" and causes the query to fail since the expression {{concat('x', a, 's')}} in the SELECT clause is neither referencing a grouping expression nor a aggregate expression. The problem is that we try to mix two operations in one operator, and worse, in one field: the group-and-aggregate operation and the project operation. There are two ways to solve this problem: 1. Break the two operations into two logical operators, which means a group-by query can usually to be mapped into a Project-over-Aggregate pattern. 2. Break the two operations into multiple fields in the Aggregate operator, the same way we do for physical aggregate classes (e.g., {{HashAggregateExec}}, or {{SortAggregateExec}}). Thus, {{groupingExpressions}} would still be the expressions from the GROUP BY clause (as before), but {{aggregateExpressions}} would contain aggregate functions only, and {{resultExpressions}} would be the project list in the SELECT clause holding references to either {{groupingExpressions}} or {{aggregateExpressions}}. I would say option 1 is even clearer, but it would be more likely to break the pattern matching in existing optimization rules and thus require more changes in the compiler. So we'd probably wanna go with option 2. That said, I suggest we achieve this goal through two iterative steps: Phase 1: Keep the current fields of logical Aggregate as {{groupingExpressions}} and {{aggregateExpressions}}, but change the semantics of {{aggregateExpressions}} by replacing the grouping expressions with corresponding references to expressions in {{groupingExpressions}}. The aggregate expressions in {{aggregateExpressions}} will remain the same. Phase 2: Add
[jira] [Created] (SPARK-25914) Separate projection from grouping and aggregate in logical Aggregate
Maryann Xue created SPARK-25914: --- Summary: Separate projection from grouping and aggregate in logical Aggregate Key: SPARK-25914 URL: https://issues.apache.org/jira/browse/SPARK-25914 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.0 Reporter: Maryann Xue Currently the Spark SQL logical Aggregate has two expression fields: {{groupingExpressions}} and {{aggregateExpressions}}, in which {{aggregateExpressions}} is actually the result expressions, or in other words, the project list in the SELECT clause. This would cause an exception while processing the following query: SELECT concat('x', concat(a, 's')) FROM testData2 GROUP BY concat(a, 's') After optimization, the query becomes: SELECT concat('x', a, 's') FROM testData2 GROUP BY concat(a, 's') The optimization rule {{CombineConcats}} optimizes the expressions by flattening "concat" and causes the query to fail since the expression {{concat('x', a, 's')}} in the SELECT clause is neither referencing a grouping expression nor a aggregate expression. The problem is that we try to mix two operations in one operator, and worse, in one field: the group-and-aggregate operation and the project operation. There are two ways to solve this problem: 1. Break the two operations into two logical operators, which means a group-by query can usually to be mapped into a Project-over-Aggregate pattern. 2. Break the two operations into multiple fields in the Aggregate operator, the same way we do for physical aggregate classes (e.g., {{HashAggregateExec}}, or {{SortAggregateExec}}). Thus, {{groupingExpressions}} would still be the expressions from the GROUP BY clause (as before), but {{aggregateExpressions}} would contain aggregate functions only, and {{resultExpressions}} would be the project list in the SELECT clause holding references to either {{groupingExpressions}} or {{aggregateExpressions}}. I would say option 1 is even clearer, but it would be more likely to break the pattern matching in existing optimization rules and thus require more changes in the compiler. So we'd probably wanna go with option 2. That said, I suggest we achieve this goal through two iterative steps: Phase 1: Keep the current fields of logical Aggregate as {{groupingExpressions}} and {{aggregateExpressions}}, but change the semantics of {{aggregateExpressions}} by replacing the grouping expressions with corresponding references to expressions in {{groupingExpressions}}. The aggregate expressions in {{aggregateExpressions}} will remain the same. Phase 2: Add {{resultExpressions}} for the project list, and keep only aggregate expressions in {{aggregateExpressions}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25690) Analyzer rule "HandleNullInputsForUDF" does not stabilize and can be applied infinitely
[ https://issues.apache.org/jira/browse/SPARK-25690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-25690: Description: This was fixed in SPARK-24891 and was then broken by SPARK-25044. The unit test in {{AnalysisSuite}} added in SPARK-24891 should have failed but didn't because it wasn't properly updated after the {{ScalaUDF}} constructor signature change. In the meantime, the other two end-to-end tests added in SPARK-24891 were shadowed by SPARK-24865. So the unit test mentioned above, if updated properly, can reproduce this issue: {code:java} test("SPARK-24891 Fix HandleNullInputsForUDF rule") { val a = testRelation.output(0) val func = (x: Int, y: Int) => x + y val udf1 = ScalaUDF(func, IntegerType, a :: a :: Nil, nullableTypes = false :: false :: Nil) val udf2 = ScalaUDF(func, IntegerType, a :: udf1 :: Nil, nullableTypes = false :: false :: Nil) val plan = Project(Alias(udf2, "")() :: Nil, testRelation) comparePlans(plan.analyze, plan.analyze.analyze) }{code} was: This was fixed in SPARK-24891 and was then broken by SPARK-25044. The tests added in SPARK-24891 were not good enough and the expected failures were shadowed by SPARK-24865. For more details, please refer to SPARK-25650. Code changes and tests in [https://github.com/apache/spark/pull/22060/files#diff-f70523b948b7af21abddfa3ab7e1d7d6R72] can help reproduce the issue. > Analyzer rule "HandleNullInputsForUDF" does not stabilize and can be applied > infinitely > --- > > Key: SPARK-25690 > URL: https://issues.apache.org/jira/browse/SPARK-25690 > Project: Spark > Issue Type: Sub-task > Components: Spark Core, SQL >Affects Versions: 2.4.0 >Reporter: Maryann Xue >Priority: Major > > This was fixed in SPARK-24891 and was then broken by SPARK-25044. > The unit test in {{AnalysisSuite}} added in SPARK-24891 should have failed > but didn't because it wasn't properly updated after the {{ScalaUDF}} > constructor signature change. In the meantime, the other two end-to-end tests > added in SPARK-24891 were shadowed by SPARK-24865. > So the unit test mentioned above, if updated properly, can reproduce this > issue: > {code:java} > test("SPARK-24891 Fix HandleNullInputsForUDF rule") { > val a = testRelation.output(0) > val func = (x: Int, y: Int) => x + y > val udf1 = ScalaUDF(func, IntegerType, a :: a :: Nil, nullableTypes = false > :: false :: Nil) > val udf2 = ScalaUDF(func, IntegerType, a :: udf1 :: Nil, nullableTypes = > false :: false :: Nil) > val plan = Project(Alias(udf2, "")() :: Nil, testRelation) > comparePlans(plan.analyze, plan.analyze.analyze) > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25650) Make analyzer rules used in once-policy idempotent
[ https://issues.apache.org/jira/browse/SPARK-25650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-25650: Description: Rules like {{HandleNullInputsForUDF}} (https://issues.apache.org/jira/browse/SPARK-24891) do not stabilize (can apply new changes to a plan indefinitely) and can cause problems like SQL cache mismatching. Ideally, all rules whether in a once-policy batch or a fixed-point-policy batch should stabilize after the number of runs specified. Once-policy should be considered a performance improvement, a assumption that the rule can stabilize after just one run rather than an assumption that the rule won't be applied more than once. Those once-policy rules should be able to run fine with fixed-point policy rule as well. Currently we already have a check for fixed-point and throws an exception if maximum number of runs is reached and the plan is still changing. Here, in this PR, a similar check is added for once-policy and throws an exception if the plan changes between the first run and the second run of a once-policy rule. To reproduce this issue, go to [https://github.com/apache/spark/pull/22060], apply the changes and remove the specific rule from the whitelist https://github.com/apache/spark/pull/22060/files#diff-f70523b948b7af21abddfa3ab7e1d7d6R71. was: Rules like {{HandleNullInputsForUDF}} (https://issues.apache.org/jira/browse/SPARK-24891) do not stabilize (can apply new changes to a plan indefinitely) and can cause problems like SQL cache mismatching. Ideally, all rules whether in a once-policy batch or a fixed-point-policy batch should stabilize after the number of runs specified. Once-policy should be considered a performance improvement, a assumption that the rule can stabilize after just one run rather than an assumption that the rule won't be applied more than once. Those once-policy rules should be able to run fine with fixed-point policy rule as well. Currently we already have a check for fixed-point and throws an exception if maximum number of runs is reached and the plan is still changing. Here, in this PR, a similar check is added for once-policy and throws an exception if the plan changes between the first run and the second run of a once-policy rule. To reproduce this issue, go to [https://github.com/apache/spark/pull/22060] and apply the changes. > Make analyzer rules used in once-policy idempotent > -- > > Key: SPARK-25650 > URL: https://issues.apache.org/jira/browse/SPARK-25650 > Project: Spark > Issue Type: Task > Components: SQL >Affects Versions: 2.3.2 >Reporter: Maryann Xue >Priority: Major > > Rules like {{HandleNullInputsForUDF}} > (https://issues.apache.org/jira/browse/SPARK-24891) do not stabilize (can > apply new changes to a plan indefinitely) and can cause problems like SQL > cache mismatching. > Ideally, all rules whether in a once-policy batch or a fixed-point-policy > batch should stabilize after the number of runs specified. Once-policy should > be considered a performance improvement, a assumption that the rule can > stabilize after just one run rather than an assumption that the rule won't be > applied more than once. Those once-policy rules should be able to run fine > with fixed-point policy rule as well. > Currently we already have a check for fixed-point and throws an exception if > maximum number of runs is reached and the plan is still changing. Here, in > this PR, a similar check is added for once-policy and throws an exception if > the plan changes between the first run and the second run of a once-policy > rule. > To reproduce this issue, go to [https://github.com/apache/spark/pull/22060], > apply the changes and remove the specific rule from the whitelist > https://github.com/apache/spark/pull/22060/files#diff-f70523b948b7af21abddfa3ab7e1d7d6R71. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25691) Analyzer rule "AliasViewChild" does not stabilize
Maryann Xue created SPARK-25691: --- Summary: Analyzer rule "AliasViewChild" does not stabilize Key: SPARK-25691 URL: https://issues.apache.org/jira/browse/SPARK-25691 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.4.0 Reporter: Maryann Xue To reproduce the issue: https://github.com/apache/spark/pull/22060/files#diff-f70523b948b7af21abddfa3ab7e1d7d6R73. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25690) Analyzer rule "HandleNullInputsForUDF" does not stabilize and can be applied infinitely
[ https://issues.apache.org/jira/browse/SPARK-25690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-25690: Issue Type: Sub-task (was: Bug) Parent: SPARK-25650 > Analyzer rule "HandleNullInputsForUDF" does not stabilize and can be applied > infinitely > --- > > Key: SPARK-25690 > URL: https://issues.apache.org/jira/browse/SPARK-25690 > Project: Spark > Issue Type: Sub-task > Components: Spark Core, SQL >Affects Versions: 2.4.0 >Reporter: Maryann Xue >Assignee: Sean Owen >Priority: Major > Fix For: 2.4.0 > > > This was fixed in SPARK-24891 and was then broken by SPARK-25044. > The tests added in SPARK-24891 were not good enough and the expected failures > were shadowed by SPARK-24865. For more details, please refer to SPARK-25650. > Code changes and tests in > [https://github.com/apache/spark/pull/22060/files#diff-f70523b948b7af21abddfa3ab7e1d7d6R72] > can help reproduce the issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25690) Analyzer rule "HandleNullInputsForUDF" does not stabilize and can be applied infinitely
[ https://issues.apache.org/jira/browse/SPARK-25690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-25690: Issue Type: Bug (was: Sub-task) Parent: (was: SPARK-14220) > Analyzer rule "HandleNullInputsForUDF" does not stabilize and can be applied > infinitely > --- > > Key: SPARK-25690 > URL: https://issues.apache.org/jira/browse/SPARK-25690 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.4.0 >Reporter: Maryann Xue >Assignee: Sean Owen >Priority: Major > Fix For: 2.4.0 > > > This was fixed in SPARK-24891 and was then broken by SPARK-25044. > The tests added in SPARK-24891 were not good enough and the expected failures > were shadowed by SPARK-24865. For more details, please refer to SPARK-25650. > Code changes and tests in > [https://github.com/apache/spark/pull/22060/files#diff-f70523b948b7af21abddfa3ab7e1d7d6R72] > can help reproduce the issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25690) Analyzer rule "HandleNullInputsForUDF" does not stabilize and can be applied infinitely
[ https://issues.apache.org/jira/browse/SPARK-25690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-25690: Description: This was fixed in SPARK-24891 and was then broken by SPARK-25044. The tests added in SPARK-24891 were not good enough and the expected failures were shadowed by SPARK-24865. For more details, please refer to SPARK-25650. Code changes and tests in [https://github.com/apache/spark/pull/22060/files#diff-f70523b948b7af21abddfa3ab7e1d7d6R72] can help reproduce the issue. was: A few SQL-related tests fail in Scala 2.12, such as UDFSuite's "SPARK-24891 Fix HandleNullInputsForUDF rule": {code:java} - SPARK-24891 Fix HandleNullInputsForUDF rule *** FAILED *** Results do not match for query: ... == Results == == Results == !== Correct Answer - 3 == == Spark Answer - 3 == !struct<> struct ![0,10,null] [0,10,0] ![1,12,null] [1,12,1] ![2,14,null] [2,14,2] (QueryTest.scala:163){code} You can kind of get what's going on reading the test: {code:java} test("SPARK-24891 Fix HandleNullInputsForUDF rule") { // assume(!ClosureCleanerSuite2.supportsLMFs) // This test won't test what it intends to in 2.12, as lambda metafactory closures // have arg types that are not primitive, but Object val udf1 = udf({(x: Int, y: Int) => x + y}) val df = spark.range(0, 3).toDF("a") .withColumn("b", udf1($"a", udf1($"a", lit(10 .withColumn("c", udf1($"a", lit(null))) val plan = spark.sessionState.executePlan(df.logicalPlan).analyzed comparePlans(df.logicalPlan, plan) checkAnswer( df, Seq( Row(0, 10, null), Row(1, 12, null), Row(2, 14, null))) }{code} It seems that the closure that is fed in as a UDF changes behavior, in a way that primitive-type arguments are handled differently. For example an Int argument, when fed 'null', acts like 0. I'm sure it's a difference in the LMF closure and how its types are understood, but not exactly sure of the cause yet. > Analyzer rule "HandleNullInputsForUDF" does not stabilize and can be applied > infinitely > --- > > Key: SPARK-25690 > URL: https://issues.apache.org/jira/browse/SPARK-25690 > Project: Spark > Issue Type: Sub-task > Components: Spark Core, SQL >Affects Versions: 2.4.0 >Reporter: Maryann Xue >Assignee: Sean Owen >Priority: Major > Fix For: 2.4.0 > > > This was fixed in SPARK-24891 and was then broken by SPARK-25044. > The tests added in SPARK-24891 were not good enough and the expected failures > were shadowed by SPARK-24865. For more details, please refer to SPARK-25650. > Code changes and tests in > [https://github.com/apache/spark/pull/22060/files#diff-f70523b948b7af21abddfa3ab7e1d7d6R72] > can help reproduce the issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25690) Analyzer rule "HandleNullInputsForUDF" does not stabilize and can be applied infinitely
Maryann Xue created SPARK-25690: --- Summary: Analyzer rule "HandleNullInputsForUDF" does not stabilize and can be applied infinitely Key: SPARK-25690 URL: https://issues.apache.org/jira/browse/SPARK-25690 Project: Spark Issue Type: Sub-task Components: Spark Core, SQL Affects Versions: 2.4.0 Reporter: Maryann Xue Assignee: Sean Owen Fix For: 2.4.0 A few SQL-related tests fail in Scala 2.12, such as UDFSuite's "SPARK-24891 Fix HandleNullInputsForUDF rule": {code:java} - SPARK-24891 Fix HandleNullInputsForUDF rule *** FAILED *** Results do not match for query: ... == Results == == Results == !== Correct Answer - 3 == == Spark Answer - 3 == !struct<> struct ![0,10,null] [0,10,0] ![1,12,null] [1,12,1] ![2,14,null] [2,14,2] (QueryTest.scala:163){code} You can kind of get what's going on reading the test: {code:java} test("SPARK-24891 Fix HandleNullInputsForUDF rule") { // assume(!ClosureCleanerSuite2.supportsLMFs) // This test won't test what it intends to in 2.12, as lambda metafactory closures // have arg types that are not primitive, but Object val udf1 = udf({(x: Int, y: Int) => x + y}) val df = spark.range(0, 3).toDF("a") .withColumn("b", udf1($"a", udf1($"a", lit(10 .withColumn("c", udf1($"a", lit(null))) val plan = spark.sessionState.executePlan(df.logicalPlan).analyzed comparePlans(df.logicalPlan, plan) checkAnswer( df, Seq( Row(0, 10, null), Row(1, 12, null), Row(2, 14, null))) }{code} It seems that the closure that is fed in as a UDF changes behavior, in a way that primitive-type arguments are handled differently. For example an Int argument, when fed 'null', acts like 0. I'm sure it's a difference in the LMF closure and how its types are understood, but not exactly sure of the cause yet. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25650) Make analyzer rules used in once-policy idempotent
[ https://issues.apache.org/jira/browse/SPARK-25650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-25650: Issue Type: Task (was: Bug) > Make analyzer rules used in once-policy idempotent > -- > > Key: SPARK-25650 > URL: https://issues.apache.org/jira/browse/SPARK-25650 > Project: Spark > Issue Type: Task > Components: SQL >Affects Versions: 2.3.2 >Reporter: Maryann Xue >Priority: Major > > Rules like {{HandleNullInputsForUDF}} > (https://issues.apache.org/jira/browse/SPARK-24891) do not stabilize (can > apply new changes to a plan indefinitely) and can cause problems like SQL > cache mismatching. > Ideally, all rules whether in a once-policy batch or a fixed-point-policy > batch should stabilize after the number of runs specified. Once-policy should > be considered a performance improvement, a assumption that the rule can > stabilize after just one run rather than an assumption that the rule won't be > applied more than once. Those once-policy rules should be able to run fine > with fixed-point policy rule as well. > Currently we already have a check for fixed-point and throws an exception if > maximum number of runs is reached and the plan is still changing. Here, in > this PR, a similar check is added for once-policy and throws an exception if > the plan changes between the first run and the second run of a once-policy > rule. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25650) Make analyzer rules used in once-policy idempotent
[ https://issues.apache.org/jira/browse/SPARK-25650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-25650: Description: Rules like {{HandleNullInputsForUDF}} (https://issues.apache.org/jira/browse/SPARK-24891) do not stabilize (can apply new changes to a plan indefinitely) and can cause problems like SQL cache mismatching. Ideally, all rules whether in a once-policy batch or a fixed-point-policy batch should stabilize after the number of runs specified. Once-policy should be considered a performance improvement, a assumption that the rule can stabilize after just one run rather than an assumption that the rule won't be applied more than once. Those once-policy rules should be able to run fine with fixed-point policy rule as well. Currently we already have a check for fixed-point and throws an exception if maximum number of runs is reached and the plan is still changing. Here, in this PR, a similar check is added for once-policy and throws an exception if the plan changes between the first run and the second run of a once-policy rule. To reproduce this issue, go to [https://github.com/apache/spark/pull/22060] and apply the changes. was: Rules like {{HandleNullInputsForUDF}} (https://issues.apache.org/jira/browse/SPARK-24891) do not stabilize (can apply new changes to a plan indefinitely) and can cause problems like SQL cache mismatching. Ideally, all rules whether in a once-policy batch or a fixed-point-policy batch should stabilize after the number of runs specified. Once-policy should be considered a performance improvement, a assumption that the rule can stabilize after just one run rather than an assumption that the rule won't be applied more than once. Those once-policy rules should be able to run fine with fixed-point policy rule as well. Currently we already have a check for fixed-point and throws an exception if maximum number of runs is reached and the plan is still changing. Here, in this PR, a similar check is added for once-policy and throws an exception if the plan changes between the first run and the second run of a once-policy rule. > Make analyzer rules used in once-policy idempotent > -- > > Key: SPARK-25650 > URL: https://issues.apache.org/jira/browse/SPARK-25650 > Project: Spark > Issue Type: Task > Components: SQL >Affects Versions: 2.3.2 >Reporter: Maryann Xue >Priority: Major > > Rules like {{HandleNullInputsForUDF}} > (https://issues.apache.org/jira/browse/SPARK-24891) do not stabilize (can > apply new changes to a plan indefinitely) and can cause problems like SQL > cache mismatching. > Ideally, all rules whether in a once-policy batch or a fixed-point-policy > batch should stabilize after the number of runs specified. Once-policy should > be considered a performance improvement, a assumption that the rule can > stabilize after just one run rather than an assumption that the rule won't be > applied more than once. Those once-policy rules should be able to run fine > with fixed-point policy rule as well. > Currently we already have a check for fixed-point and throws an exception if > maximum number of runs is reached and the plan is still changing. Here, in > this PR, a similar check is added for once-policy and throws an exception if > the plan changes between the first run and the second run of a once-policy > rule. > To reproduce this issue, go to [https://github.com/apache/spark/pull/22060] > and apply the changes. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25650) Make analyzer rules used in once-policy idempotent
Maryann Xue created SPARK-25650: --- Summary: Make analyzer rules used in once-policy idempotent Key: SPARK-25650 URL: https://issues.apache.org/jira/browse/SPARK-25650 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.2 Reporter: Maryann Xue Rules like {{HandleNullInputsForUDF}} (https://issues.apache.org/jira/browse/SPARK-24891) do not stabilize (can apply new changes to a plan indefinitely) and can cause problems like SQL cache mismatching. Ideally, all rules whether in a once-policy batch or a fixed-point-policy batch should stabilize after the number of runs specified. Once-policy should be considered a performance improvement, a assumption that the rule can stabilize after just one run rather than an assumption that the rule won't be applied more than once. Those once-policy rules should be able to run fine with fixed-point policy rule as well. Currently we already have a check for fixed-point and throws an exception if maximum number of runs is reached and the plan is still changing. Here, in this PR, a similar check is added for once-policy and throws an exception if the plan changes between the first run and the second run of a once-policy rule. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25505) The output order of grouping columns in Pivot is different from the input order
Maryann Xue created SPARK-25505: --- Summary: The output order of grouping columns in Pivot is different from the input order Key: SPARK-25505 URL: https://issues.apache.org/jira/browse/SPARK-25505 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.0 Reporter: Maryann Xue For example, {code} SELECT * FROM ( SELECT course, earnings, "a" as a, "z" as z, "b" as b, "y" as y, "c" as c, "x" as x, "d" as d, "w" as w FROM courseSales ) PIVOT ( sum(earnings) FOR course IN ('dotNET', 'Java') ) {code} The output columns should be "a, z, b, y, c, x, d, w, ..." but now it is "a, b, c, d, w, x, y, z, ..." -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25450) PushProjectThroughUnion rule uses the same exprId for project expressions in each Union child, causing mistakes in constant propagation
[ https://issues.apache.org/jira/browse/SPARK-25450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-25450: Issue Type: Bug (was: Improvement) > PushProjectThroughUnion rule uses the same exprId for project expressions in > each Union child, causing mistakes in constant propagation > --- > > Key: SPARK-25450 > URL: https://issues.apache.org/jira/browse/SPARK-25450 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: Maryann Xue >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25450) PushProjectThroughUnion rule uses the same exprId for project expressions in each Union child, causing mistakes in constant propagation
Maryann Xue created SPARK-25450: --- Summary: PushProjectThroughUnion rule uses the same exprId for project expressions in each Union child, causing mistakes in constant propagation Key: SPARK-25450 URL: https://issues.apache.org/jira/browse/SPARK-25450 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.1 Reporter: Maryann Xue -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25423) Output "dataFilters" in DataSourceScanExec.metadata
[ https://issues.apache.org/jira/browse/SPARK-25423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-25423: Summary: Output "dataFilters" in DataSourceScanExec.metadata (was: Output "dataFilters" in DataSourceScanExec.toString) > Output "dataFilters" in DataSourceScanExec.metadata > --- > > Key: SPARK-25423 > URL: https://issues.apache.org/jira/browse/SPARK-25423 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Maryann Xue >Priority: Trivial > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25423) Output "dataFilters" in DataSourceScanExec.toString
Maryann Xue created SPARK-25423: --- Summary: Output "dataFilters" in DataSourceScanExec.toString Key: SPARK-25423 URL: https://issues.apache.org/jira/browse/SPARK-25423 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.1 Reporter: Maryann Xue -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25415) Make plan change log in RuleExecutor configurable by SQLConf
Maryann Xue created SPARK-25415: --- Summary: Make plan change log in RuleExecutor configurable by SQLConf Key: SPARK-25415 URL: https://issues.apache.org/jira/browse/SPARK-25415 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.1 Reporter: Maryann Xue In RuleExecutor, after applying a rule, if the plan has changed, the before and after plan will be logged using level "trace". At times, however, such information can be very helpful for debugging, so making the log level configurable in SQLConf would allow users to turn on the plan change log independently and save the trouble of tweaking log4j settings. Meanwhile, filtering plan change log for specific rules can also be very useful. So I propose adding two confs: 1. spark.sql.optimizer.planChangeLog.level - set a specific log level for logging plan changes after a rule is applied. 2. spark.sql.optimizer.planChangeLog.rules - enable plan change logging only for a set of specified rules, separated by commas. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25063) Rename class KnowNotNull to KnownNotNull
[ https://issues.apache.org/jira/browse/SPARK-25063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-25063: Description: It's a class name typo checked in through SPARK-24891 (was: It's a class name typo check-in through SPARK-24891) > Rename class KnowNotNull to KnownNotNull > > > Key: SPARK-25063 > URL: https://issues.apache.org/jira/browse/SPARK-25063 > Project: Spark > Issue Type: Task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Maryann Xue >Priority: Trivial > > It's a class name typo checked in through SPARK-24891 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25063) Rename class KnowNotNull to KnownNotNull
Maryann Xue created SPARK-25063: --- Summary: Rename class KnowNotNull to KnownNotNull Key: SPARK-25063 URL: https://issues.apache.org/jira/browse/SPARK-25063 Project: Spark Issue Type: Task Components: SQL Affects Versions: 2.4.0 Reporter: Maryann Xue It's a class name typo check-in through SPARK-24891 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24972) PivotFirst could not handle pivot columns of complex types
Maryann Xue created SPARK-24972: --- Summary: PivotFirst could not handle pivot columns of complex types Key: SPARK-24972 URL: https://issues.apache.org/jira/browse/SPARK-24972 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.1 Reporter: Maryann Xue {{PivotFirst}} did not handle complex types for pivot columns properly. And as a result, the pivot column could not be matched with any pivot value and it always returned empty result. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24891) Fix HandleNullInputsForUDF rule
[ https://issues.apache.org/jira/browse/SPARK-24891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-24891: Summary: Fix HandleNullInputsForUDF rule (was: Fix HandleNullInputsForUDF rule.) > Fix HandleNullInputsForUDF rule > --- > > Key: SPARK-24891 > URL: https://issues.apache.org/jira/browse/SPARK-24891 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: Maryann Xue >Priority: Major > > The HandleNullInputsForUDF rule can generate new {{If}} node infinitely, thus > causing problems like match of SQL cache missed. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24891) Fix HandleNullInputsForUDF rule.
Maryann Xue created SPARK-24891: --- Summary: Fix HandleNullInputsForUDF rule. Key: SPARK-24891 URL: https://issues.apache.org/jira/browse/SPARK-24891 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.1 Reporter: Maryann Xue The HandleNullInputsForUDF rule can generate new {{If}} node infinitely, thus causing problems like match of SQL cache missed. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24802) Optimization Rule Exclusion
[ https://issues.apache.org/jira/browse/SPARK-24802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-24802: Description: Since Spark has provided fairly clear interfaces for adding user-defined optimization rules, it would be nice to have an easy-to-use interface for excluding an optimization rule from the Spark query optimizer as well. This would make customizing Spark optimizer easier and sometimes could debugging issues too. # Add a new config {{spark.sql.optimizer.excludedRules}}, with the value being a list of rule names separated by comma. # Modify the current {{batches}} method to remove the excluded rules from the default batches. Log the rules that have been excluded. # Split the existing default batches into "post-analysis batches" and "optimization batches" so that only rules in the "optimization batches" can be excluded. was: Since Spark has provided fairly clear interfaces for adding user-defined optimization rules, it would be nice to have an easy-to-use interface for excluding an optimization rule from the Spark query optimizer as well. This would make customizing Spark optimizer easier and sometimes could debugging issues too. # Add a new config {{spark.sql.optimizer.excludedRules}}, with the value being a list of rule names separated by comma. # Modify the current {{batches}} method to remove the excluded rules from the default batches. Log the rules that have been excluded. # Split the existing default batches into "post-analysis batches" and "optimization batches" so that only rules in the "optimization batches" can be excluded. > Optimization Rule Exclusion > --- > > Key: SPARK-24802 > URL: https://issues.apache.org/jira/browse/SPARK-24802 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.3.0 >Reporter: Maryann Xue >Priority: Major > > Since Spark has provided fairly clear interfaces for adding user-defined > optimization rules, it would be nice to have an easy-to-use interface for > excluding an optimization rule from the Spark query optimizer as well. > This would make customizing Spark optimizer easier and sometimes could > debugging issues too. > # Add a new config {{spark.sql.optimizer.excludedRules}}, with the value > being a list of rule names separated by comma. > # Modify the current {{batches}} method to remove the excluded rules from > the default batches. Log the rules that have been excluded. > # Split the existing default batches into "post-analysis batches" and > "optimization batches" so that only rules in the "optimization batches" can > be excluded. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24802) Optimization Rule Exclusion
Maryann Xue created SPARK-24802: --- Summary: Optimization Rule Exclusion Key: SPARK-24802 URL: https://issues.apache.org/jira/browse/SPARK-24802 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 2.3.0 Reporter: Maryann Xue Since Spark has provided fairly clear interfaces for adding user-defined optimization rules, it would be nice to have an easy-to-use interface for excluding an optimization rule from the Spark query optimizer as well. This would make customizing Spark optimizer easier and sometimes could debugging issues too. # Add a new config {{spark.sql.optimizer.excludedRules}}, with the value being a list of rule names separated by comma. # Modify the current {{batches}} method to remove the excluded rules from the default batches. Log the rules that have been excluded. # Split the existing default batches into "post-analysis batches" and "optimization batches" so that only rules in the "optimization batches" can be excluded. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24790) Allow complex aggregate expressions in Pivot
Maryann Xue created SPARK-24790: --- Summary: Allow complex aggregate expressions in Pivot Key: SPARK-24790 URL: https://issues.apache.org/jira/browse/SPARK-24790 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: Maryann Xue In SPARK-24035, to conform with Oracle PIVOT syntax, a strict check of PIVOT's aggregate expressions was exerted, which allows only an AggregateExpression with or without alias. Now we would like to relax to complex aggregate expressions, like {{ceil(sum(col1))}} or {{sum(col1) + 1}} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24790) Allow complex aggregate expressions in Pivot
[ https://issues.apache.org/jira/browse/SPARK-24790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-24790: Description: In SPARK-24035, to conform with Oracle PIVOT syntax, a strict check of PIVOT's aggregate expressions was exerted, which allows only an AggregateExpression with or without alias. Now we would like to relax the check to allow complex aggregate expressions, like {{ceil(sum(col1))}} or {{sum(col1) + 1}} (was: In SPARK-24035, to conform with Oracle PIVOT syntax, a strict check of PIVOT's aggregate expressions was exerted, which allows only an AggregateExpression with or without alias. Now we would like to relax to complex aggregate expressions, like {{ceil(sum(col1))}} or {{sum(col1) + 1}}) > Allow complex aggregate expressions in Pivot > > > Key: SPARK-24790 > URL: https://issues.apache.org/jira/browse/SPARK-24790 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Maryann Xue >Priority: Minor > > In SPARK-24035, to conform with Oracle PIVOT syntax, a strict check of > PIVOT's aggregate expressions was exerted, which allows only an > AggregateExpression with or without alias. Now we would like to relax the > check to allow complex aggregate expressions, like {{ceil(sum(col1))}} or > {{sum(col1) + 1}} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24696) ColumnPruning rule fails to remove extra Project
[ https://issues.apache.org/jira/browse/SPARK-24696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-24696: Description: The following test case would cause a "max iterations reached" error, i.e., an infinite loop in the optimizer. {code} test("optimization infinite loop") { withTempDir { dir => val path = dir.getCanonicalPath import testImplicits._ Seq((3, "a")).toDF("id", "value").write.format("parquet").mode("overwrite").save(path) import org.apache.spark.sql.functions.udf val filterIt = udf((value: Int) => value > 0).asNondeterministic spark.read.load(path) .where(filterIt('id)) .where('id < 0) .select('id) .collect } } {code} Error message: {code} Max iterations (100) reached for batch Operator Optimization before Inferring Filters, tree: Filter (id#11 < 0) +- Project [id#11] +- Filter if (isnull(id#11)) null else UDF(id#11) +- Relation[id#11,value#12] parquet org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Max iterations (100) reached for batch Operator Optimization before Inferring Filters, tree: Filter (id#11 < 0) +- Project [id#11] +- Filter if (isnull(id#11)) null else UDF(id#11) +- Relation[id#11,value#12] parquet at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:117) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3286) at org.apache.spark.sql.Dataset.collect(Dataset.scala:2745) at org.apache.spark.sql.DataFrameSuite$$anonfun$94$$anonfun$apply$mcV$sp$199.apply(DataFrameSuite.scala:2343) at org.apache.spark.sql.DataFrameSuite$$anonfun$94$$anonfun$apply$mcV$sp$199.apply(DataFrameSuite.scala:2333) at org.apache.spark.sql.test.SQLTestUtilsBase$class.withTempDir(SQLTestUtils.scala:215) at org.apache.spark.sql.DataFrameSuite.withTempDir(DataFrameSuite.scala:43) at org.apache.spark.sql.DataFrameSuite$$anonfun$94.apply$mcV$sp(DataFrameSuite.scala:2333) at org.apache.spark.sql.DataFrameSuite$$anonfun$94.apply(DataFrameSuite.scala:2333) at org.apache.spark.sql.DataFrameSuite$$anonfun$94.apply(DataFrameSuite.scala:2333) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:103) at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289) at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196) at org.apache.spark.sql.DataFrameSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(DataFrameSuite.scala:43) at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221) at org.apache.spark.sql.DataFrameSuite.runTest(DataFrameSuite.scala:43) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384) at scala.collection.immutable.List.foreach(List.scala:392) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461) at
[jira] [Updated] (SPARK-24596) Non-cascading Cache Invalidation
[ https://issues.apache.org/jira/browse/SPARK-24596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-24596: Description: When invalidating a cache, we invalid other caches dependent on this cache to ensure cached data is up to date. For example, when the underlying table has been modified or the table has been dropped itself, all caches that use this table should be invalidated or refreshed. However, in other cases, like when user simply want to drop a cache to free up memory, we do not need to invalidate dependent caches since no underlying data has been changed. For this reason, we would like to introduce a new cache invalidation mode: the non-cascading cache invalidation. And we choose between the existing mode and the new mode for different cache invalidation scenarios: # Drop tables and regular (persistent) views: regular mode # Drop temporary views: non-cascading mode # Modify table contents (INSERT/UPDATE/MERGE/DELETE): regular mode # Call {{DataSet.unpersist()}}: non-cascading mode # Call {{Catalog.uncacheTable()}}: follow the same convention as drop tables/view, which is, use non-cascading mode for temporary views and regular mode for the rest Note that a regular (persistent) view is a database object just like a table, so after dropping a regular view (whether cached or not cached), any query referring to that view should no long be valid. Hence if a cached persistent view is dropped, we need to invalidate the all dependent caches so that exceptions will be thrown for any later reference. On the other hand, a temporary view is in fact equivalent to an unnamed DataSet, and dropping a temporary view should have no impact on queries referencing that view. Thus we should do non-cascading uncaching for temporary views, which also guarantees a consistent uncaching behavior between temporary views and unnamed DataSets. was: When invalidating a cache, we invalid other caches dependent on this cache to ensure cached data is up to date. For example, when the underlying table has been modified or the table has been dropped itself, all caches that use this table should be invalidated or refreshed. However, in other cases, like when user simply want to drop a cache to free up memory, we do not need to invalidate dependent caches since no underlying data has been changed. For this reason, we would like to introduce a new cache invalidation mode: the non-cascading cache invalidation. And we choose between the existing mode and the new mode for different cache invalidation scenarios: # Drop tables and regular (persistent) views: regular mode # Drop temporary views: non-cascading mode # Modify table contents (INSERT/UPDATE/MERGE/DELETE): regular mode # Call DataSet.unpersist(): non-cascading mode Note that a regular (persistent) view is a database object just like a table, so after dropping a regular view (whether cached or not cached), any query referring to that view should no long be valid. Hence if a cached persistent view is dropped, we need to invalidate the all dependent caches so that exceptions will be thrown for any later reference. On the other hand, a temporary view is in fact equivalent to an unnamed DataSet, and dropping a temporary view should have no impact on queries referencing that view. Thus we should do non-cascading uncaching for temporary views, which also guarantees a consistent uncaching behavior between temporary views and unnamed DataSets. > Non-cascading Cache Invalidation > > > Key: SPARK-24596 > URL: https://issues.apache.org/jira/browse/SPARK-24596 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Maryann Xue >Priority: Major > Fix For: 2.4.0 > > > When invalidating a cache, we invalid other caches dependent on this cache to > ensure cached data is up to date. For example, when the underlying table has > been modified or the table has been dropped itself, all caches that use this > table should be invalidated or refreshed. > However, in other cases, like when user simply want to drop a cache to free > up memory, we do not need to invalidate dependent caches since no underlying > data has been changed. For this reason, we would like to introduce a new > cache invalidation mode: the non-cascading cache invalidation. And we choose > between the existing mode and the new mode for different cache invalidation > scenarios: > # Drop tables and regular (persistent) views: regular mode > # Drop temporary views: non-cascading mode > # Modify table contents (INSERT/UPDATE/MERGE/DELETE): regular mode > # Call {{DataSet.unpersist()}}: non-cascading mode > # Call {{Catalog.uncacheTable()}}: follow the same convention as drop > tables/view, which is, use
[jira] [Updated] (SPARK-24613) Cache with UDF could not be matched with subsequent dependent caches
[ https://issues.apache.org/jira/browse/SPARK-24613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-24613: Description: When caching a query, we generate its execution plan from the query's logical plan. However, the logical plan we get from the Dataset has already been analyzed, and when we try the get the execution plan, this already analyzed logical plan will be analyzed again in the new QueryExecution object, and unfortunately some rules have side effects if applied multiple times, which in this case, is the {{HandleNullInputsForUDF}} rule. The re-analyzed plan now has an extra null-check and can't be matched against the same plan. The following test would fail since {{df2}}'s execution plan inside the CacheManager does not depend on {{df1}}. {code:java} test("cache UDF result correctly 2") { val expensiveUDF = udf({x: Int => Thread.sleep(1); x}) val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a")) val df2 = df.agg(sum(df("b"))) df.cache() df.count() df2.cache() // udf has been evaluated during caching, and thus should not be re-evaluated here failAfter(5 seconds) { df2.collect() } } {code} While it might be worth re-visiting such analysis rules, we can make also fix the CacheManager to avoid these potential problems. was: When caching a query, we generate its execution plan from the query's logical plan. However, the logical plan we get from the Dataset has already been analyzed, and when we try the get the execution plan, this already analyzed logical plan will be analyzed again in the new QueryExecution object, and unfortunately some rules have side effects if applied multiple times, which in this case, is the {{HandleNullInputsForUDF}} rule. The re-analyzed plan now has an extra null-check and can't be matched against the same plan. The following test would fail since {{df2}}'s execution plan inside the CacheManager does not depend on {{df1}}. {code:java} test("cache UDF result correctly 2") { val expensiveUDF = udf({x: Int => Thread.sleep(1); x}) val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a")) val df2 = df.agg(sum(df("b"))) df.cache() df.count() df2.cache() // udf has been evaluated during caching, and thus should not be re-evaluated here failAfter(5 seconds) { df2.collect() } } {code} > Cache with UDF could not be matched with subsequent dependent caches > > > Key: SPARK-24613 > URL: https://issues.apache.org/jira/browse/SPARK-24613 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Maryann Xue >Priority: Minor > Fix For: 2.4.0 > > > When caching a query, we generate its execution plan from the query's logical > plan. However, the logical plan we get from the Dataset has already been > analyzed, and when we try the get the execution plan, this already analyzed > logical plan will be analyzed again in the new QueryExecution object, and > unfortunately some rules have side effects if applied multiple times, which > in this case, is the {{HandleNullInputsForUDF}} rule. The re-analyzed plan > now has an extra null-check and can't be matched against the same plan. The > following test would fail since {{df2}}'s execution plan inside the > CacheManager does not depend on {{df1}}. > {code:java} > test("cache UDF result correctly 2") { > val expensiveUDF = udf({x: Int => Thread.sleep(1); x}) > val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a")) > val df2 = df.agg(sum(df("b"))) > df.cache() > df.count() > df2.cache() > // udf has been evaluated during caching, and thus should not be > re-evaluated here > failAfter(5 seconds) { > df2.collect() > } > } > {code} > While it might be worth re-visiting such analysis rules, we can make also fix > the CacheManager to avoid these potential problems. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24613) Cache with UDF could not be matched with subsequent dependent caches
Maryann Xue created SPARK-24613: --- Summary: Cache with UDF could not be matched with subsequent dependent caches Key: SPARK-24613 URL: https://issues.apache.org/jira/browse/SPARK-24613 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: Maryann Xue Fix For: 2.4.0 When caching a query, we generate its execution plan from the query's logical plan. However, the logical plan we get from the Dataset has already been analyzed, and when we try the get the execution plan, this already analyzed logical plan will be analyzed again in the new QueryExecution object, and unfortunately some rules have side effects if applied multiple times, which in this case, is the {{HandleNullInputsForUDF}} rule. The re-analyzed plan now has an extra null-check and can't be matched against the same plan. The following test would fail since {{df2}}'s execution plan inside the CacheManager does not depend on {{df1}}. {code:java} test("cache UDF result correctly 2") { val expensiveUDF = udf({x: Int => Thread.sleep(1); x}) val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a")) val df2 = df.agg(sum(df("b"))) df.cache() df.count() df2.cache() // udf has been evaluated during caching, and thus should not be re-evaluated here failAfter(5 seconds) { df2.collect() } } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24613) Cache with UDF could not be matched with subsequent dependent caches
[ https://issues.apache.org/jira/browse/SPARK-24613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-24613: Priority: Minor (was: Major) > Cache with UDF could not be matched with subsequent dependent caches > > > Key: SPARK-24613 > URL: https://issues.apache.org/jira/browse/SPARK-24613 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Maryann Xue >Priority: Minor > Fix For: 2.4.0 > > > When caching a query, we generate its execution plan from the query's logical > plan. However, the logical plan we get from the Dataset has already been > analyzed, and when we try the get the execution plan, this already analyzed > logical plan will be analyzed again in the new QueryExecution object, and > unfortunately some rules have side effects if applied multiple times, which > in this case, is the {{HandleNullInputsForUDF}} rule. The re-analyzed plan > now has an extra null-check and can't be matched against the same plan. The > following test would fail since {{df2}}'s execution plan inside the > CacheManager does not depend on {{df1}}. > {code:java} > test("cache UDF result correctly 2") { > val expensiveUDF = udf({x: Int => Thread.sleep(1); x}) > val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a")) > val df2 = df.agg(sum(df("b"))) > df.cache() > df.count() > df2.cache() > // udf has been evaluated during caching, and thus should not be > re-evaluated here > failAfter(5 seconds) { > df2.collect() > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24613) Cache with UDF could not be matched with subsequent dependent caches
[ https://issues.apache.org/jira/browse/SPARK-24613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-24613: Issue Type: Bug (was: Improvement) > Cache with UDF could not be matched with subsequent dependent caches > > > Key: SPARK-24613 > URL: https://issues.apache.org/jira/browse/SPARK-24613 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Maryann Xue >Priority: Major > Fix For: 2.4.0 > > > When caching a query, we generate its execution plan from the query's logical > plan. However, the logical plan we get from the Dataset has already been > analyzed, and when we try the get the execution plan, this already analyzed > logical plan will be analyzed again in the new QueryExecution object, and > unfortunately some rules have side effects if applied multiple times, which > in this case, is the {{HandleNullInputsForUDF}} rule. The re-analyzed plan > now has an extra null-check and can't be matched against the same plan. The > following test would fail since {{df2}}'s execution plan inside the > CacheManager does not depend on {{df1}}. > {code:java} > test("cache UDF result correctly 2") { > val expensiveUDF = udf({x: Int => Thread.sleep(1); x}) > val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a")) > val df2 = df.agg(sum(df("b"))) > df.cache() > df.count() > df2.cache() > // udf has been evaluated during caching, and thus should not be > re-evaluated here > failAfter(5 seconds) { > df2.collect() > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24596) Non-cascading Cache Invalidation
Maryann Xue created SPARK-24596: --- Summary: Non-cascading Cache Invalidation Key: SPARK-24596 URL: https://issues.apache.org/jira/browse/SPARK-24596 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: Maryann Xue Fix For: 2.4.0 When invalidating a cache, we invalid other caches dependent on this cache to ensure cached data is up to date. For example, when the underlying table has been modified or the table has been dropped itself, all caches that use this table should be invalidated or refreshed. However, in other cases, like when user simply want to drop a cache to free up memory, we do not need to invalidate dependent caches since no underlying data has been changed. For this reason, we would like to introduce a new cache invalidation mode: the non-cascading cache invalidation. And we choose between the existing mode and the new mode for different cache invalidation scenarios: # Drop tables and regular (persistent) views: regular mode # Drop temporary views: non-cascading mode # Modify table contents (INSERT/UPDATE/MERGE/DELETE): regular mode # Call DataSet.unpersist(): non-cascading mode Note that a regular (persistent) view is a database object just like a table, so after dropping a regular view (whether cached or not cached), any query referring to that view should no long be valid. Hence if a cached persistent view is dropped, we need to invalidate the all dependent caches so that exceptions will be thrown for any later reference. On the other hand, a temporary view is in fact equivalent to an unnamed DataSet, and dropping a temporary view should have no impact on queries referencing that view. Thus we should do non-cascading uncaching for temporary views, which also guarantees a consistent uncaching behavior between temporary views and unnamed DataSets. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24583) Wrong schema type in InsertIntoDataSourceCommand
Maryann Xue created SPARK-24583: --- Summary: Wrong schema type in InsertIntoDataSourceCommand Key: SPARK-24583 URL: https://issues.apache.org/jira/browse/SPARK-24583 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.0 Reporter: Maryann Xue Fix For: 2.4.0 For a DataSource table, whose schema contains a field with "nullable=false", while user tries to insert a NULL value into this field, the input dataFrame will return an incorrect value or throw NullPointerException. And that's because, the schema nullability of the input relation has been overridden bluntly with the destination schema by the code below in {{InsertIntoDataSourceCommand}}: {code:java} override def run(sparkSession: SparkSession): Seq[Row] = { val relation = logicalRelation.relation.asInstanceOf[InsertableRelation] val data = Dataset.ofRows(sparkSession, query) // Apply the schema of the existing table to the new data. val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema) relation.insert(df, overwrite) // Re-cache all cached plans(including this relation itself, if it's cached) that refer to this // data source relation. sparkSession.sharedState.cacheManager.recacheByPlan(sparkSession, logicalRelation) Seq.empty[Row] } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24568) Code refactoring for DataType equalsXXX methods
Maryann Xue created SPARK-24568: --- Summary: Code refactoring for DataType equalsXXX methods Key: SPARK-24568 URL: https://issues.apache.org/jira/browse/SPARK-24568 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: Maryann Xue Fix For: 2.4.0 Right now there is a lot of code duplication between all DataType equalsXXX methods: {{equalsIgnoreNullability}}, {{equalsIgnoreCaseAndNullability}}, {{equalsIgnoreCaseAndNullability}}, {{equalsStructurally}}. We can replace the dup code with a helper function. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24288) Enable preventing predicate pushdown
[ https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480237#comment-16480237 ] Maryann Xue commented on SPARK-24288: - Thank you for pointing this out, [~cloud_fan]! I made {{OptimizerBarrier}} inherit from {{UnaryNode}} as a proof of concept that can quickly pass the basic tests, but it was not the optimal solution. I've just created a PR, so you guys can all take a look. > Enable preventing predicate pushdown > > > Key: SPARK-24288 > URL: https://issues.apache.org/jira/browse/SPARK-24288 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.3.0 >Reporter: Tomasz Gawęda >Priority: Major > Attachments: SPARK-24288.simple.patch > > > Issue discussed on Mailing List: > [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html] > While working with JDBC datasource I saw that many "or" clauses with > non-equality operators causes huge performance degradation of SQL query > to database (DB2). For example: > val df = spark.read.format("jdbc").(other options to parallelize > load).load() > df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x > > 100)").show() // in real application whose predicates were pushed > many many lines below, many ANDs and ORs > If I use cache() before where, there is no predicate pushdown of this > "where" clause. However, in production system caching many sources is a > waste of memory (especially is pipeline is long and I must do cache many > times).There are also few more workarounds, but it would be great if Spark > will support preventing predicate pushdown by user. > > For example: df.withAnalysisBarrier().where(...) ? > > Note, that this should not be a global configuration option. If I read 2 > DataFrames, df1 and df2, I would like to specify that df1 should not have > some predicates pushed down, but some may be, but df2 should have all > predicates pushed down, even if target query joins df1 and df2. As far as I > understand Spark optimizer, if we use functions like `withAnalysisBarrier` > and put AnalysisBarrier explicitly in logical plan, then predicates won't be > pushed down on this particular DataFrames and PP will be still possible on > the second one. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24288) Enable preventing predicate pushdown
[ https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16478534#comment-16478534 ] Maryann Xue commented on SPARK-24288: - Yes, sure. This is more of a demonstration than a patch. So I'll clean up and add more tests and put together a PR if we decide to go down this path. In general I think we should not break the barrier as long as things can work out correctly. It doesn't have to optimal any more since it's the user's intention to force a barrier. So do you think self-join resolving would work alright without breaking the barrier, or it would be a correctness problem? Could you please point to me a test case? > Enable preventing predicate pushdown > > > Key: SPARK-24288 > URL: https://issues.apache.org/jira/browse/SPARK-24288 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.3.0 >Reporter: Tomasz Gawęda >Priority: Major > Attachments: SPARK-24288.simple.patch > > > Issue discussed on Mailing List: > [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html] > While working with JDBC datasource I saw that many "or" clauses with > non-equality operators causes huge performance degradation of SQL query > to database (DB2). For example: > val df = spark.read.format("jdbc").(other options to parallelize > load).load() > df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x > > 100)").show() // in real application whose predicates were pushed > many many lines below, many ANDs and ORs > If I use cache() before where, there is no predicate pushdown of this > "where" clause. However, in production system caching many sources is a > waste of memory (especially is pipeline is long and I must do cache many > times).There are also few more workarounds, but it would be great if Spark > will support preventing predicate pushdown by user. > > For example: df.withAnalysisBarrier().where(...) ? > > Note, that this should not be a global configuration option. If I read 2 > DataFrames, df1 and df2, I would like to specify that df1 should not have > some predicates pushed down, but some may be, but df2 should have all > predicates pushed down, even if target query joins df1 and df2. As far as I > understand Spark optimizer, if we use functions like `withAnalysisBarrier` > and put AnalysisBarrier explicitly in logical plan, then predicates won't be > pushed down on this particular DataFrames and PP will be still possible on > the second one. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24288) Enable preventing predicate pushdown
[ https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-24288: Attachment: SPARK-24288.simple.patch > Enable preventing predicate pushdown > > > Key: SPARK-24288 > URL: https://issues.apache.org/jira/browse/SPARK-24288 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.3.0 >Reporter: Tomasz Gawęda >Priority: Major > Attachments: SPARK-24288.simple.patch > > > Issue discussed on Mailing List: > [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html] > While working with JDBC datasource I saw that many "or" clauses with > non-equality operators causes huge performance degradation of SQL query > to database (DB2). For example: > val df = spark.read.format("jdbc").(other options to parallelize > load).load() > df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x > > 100)").show() // in real application whose predicates were pushed > many many lines below, many ANDs and ORs > If I use cache() before where, there is no predicate pushdown of this > "where" clause. However, in production system caching many sources is a > waste of memory (especially is pipeline is long and I must do cache many > times).There are also few more workarounds, but it would be great if Spark > will support preventing predicate pushdown by user. > > For example: df.withAnalysisBarrier().where(...) ? > > Note, that this should not be a global configuration option. If I read 2 > DataFrames, df1 and df2, I would like to specify that df1 should not have > some predicates pushed down, but some may be, but df2 should have all > predicates pushed down, even if target query joins df1 and df2. As far as I > understand Spark optimizer, if we use functions like `withAnalysisBarrier` > and put AnalysisBarrier explicitly in logical plan, then predicates won't be > pushed down on this particular DataFrames and PP will be still possible on > the second one. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24288) Enable preventing predicate pushdown
[ https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477747#comment-16477747 ] Maryann Xue commented on SPARK-24288: - Thank you, [~cloud_fan] and [~TomaszGaweda] for the input! Here's some ideas from my side: Basically the question comes down to whether 1) avoid predicate push-down to data source level or 2) avoid predicate push-down (or other push-down optimization) anywhere in the logical operator tree. For 1) we could have the first two solutions [~cloud_fan] has proposed or the hint solution [~smilegator] has proposed. For 2) we would use a logical barrier as I have mentioned earlier. I'm in favor of the hint approach coz it would be an easy interface to use, less extendable maybe. It can be used for the entire query or with DataFrame interface as well. For example, like: {{spark.sql("select /*+ NO_PRED_PUSHDOWN */ from foobar where name like 'x%'")}} or {{spark.jdbc(...).where("name like 'x%'").hint("NO_PRED_PUSHDOWN")}} On the other hand, 2) is more powerful, for example, can be used to push part of the predicates down or to push certain while not other operators down. And like I said, it might not require a whole lot of changes. I am attaching an experiment patch here to as a quick illustration. > Enable preventing predicate pushdown > > > Key: SPARK-24288 > URL: https://issues.apache.org/jira/browse/SPARK-24288 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.3.0 >Reporter: Tomasz Gawęda >Priority: Major > > Issue discussed on Mailing List: > [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html] > While working with JDBC datasource I saw that many "or" clauses with > non-equality operators causes huge performance degradation of SQL query > to database (DB2). For example: > val df = spark.read.format("jdbc").(other options to parallelize > load).load() > df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x > > 100)").show() // in real application whose predicates were pushed > many many lines below, many ANDs and ORs > If I use cache() before where, there is no predicate pushdown of this > "where" clause. However, in production system caching many sources is a > waste of memory (especially is pipeline is long and I must do cache many > times).There are also few more workarounds, but it would be great if Spark > will support preventing predicate pushdown by user. > > For example: df.withAnalysisBarrier().where(...) ? > > Note, that this should not be a global configuration option. If I read 2 > DataFrames, df1 and df2, I would like to specify that df1 should not have > some predicates pushed down, but some may be, but df2 should have all > predicates pushed down, even if target query joins df1 and df2. As far as I > understand Spark optimizer, if we use functions like `withAnalysisBarrier` > and put AnalysisBarrier explicitly in logical plan, then predicates won't be > pushed down on this particular DataFrames and PP will be still possible on > the second one. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24288) Enable preventing predicate pushdown
[ https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476985#comment-16476985 ] Maryann Xue edited comment on SPARK-24288 at 5/16/18 7:46 AM: -- The special operator would just look like {{sql("SELECT * FROM foobar").withOptimizerBarrier().where("THEID = 1")}} or {{spark.jdbc(...).withOptimizerBarrier().where("THEID = 1")}} I assume the hint would be easier to use while the barrier would be more general (regardless of data source types) and give more control to the user (can even do partial predicate push down). If we were to implement other operator push down (e.g., agg, limit), the barrier would make more sense in terms of controlling up to which level things can be pushed down. I'm more inclined to do hint now if that covers [~TomaszGaweda]'s use case. was (Author: maryannxue): The special operator would just look like {{sql("SELECT * FROM foobar").withOptimizerBarrier().where("THEID = 1")}} or {{spark.jdbc(...).withOptimizerBarrier().where("THEID = 1")}} I assume the hint would be easier to use while the barrier would be more general (regardless of data source types) and give more control to the user. If we were to implement other operator push down (e.g., agg, limit), the barrier would make more sense in terms of controlling up to which level things can be pushed down. I'm more inclined to do hint now if that covers [~TomaszGaweda]'s use case. > Enable preventing predicate pushdown > > > Key: SPARK-24288 > URL: https://issues.apache.org/jira/browse/SPARK-24288 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.3.0 >Reporter: Tomasz Gawęda >Priority: Major > > Issue discussed on Mailing List: > [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html] > While working with JDBC datasource I saw that many "or" clauses with > non-equality operators causes huge performance degradation of SQL query > to database (DB2). For example: > val df = spark.read.format("jdbc").(other options to parallelize > load).load() > df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x > > 100)").show() // in real application whose predicates were pushed > many many lines below, many ANDs and ORs > If I use cache() before where, there is no predicate pushdown of this > "where" clause. However, in production system caching many sources is a > waste of memory (especially is pipeline is long and I must do cache many > times).There are also few more workarounds, but it would be great if Spark > will support preventing predicate pushdown by user. > > For example: df.withAnalysisBarrier().where(...) ? > > Note, that this should not be a global configuration option. If I read 2 > DataFrames, df1 and df2, I would like to specify that df1 should not have > some predicates pushed down, but some may be, but df2 should have all > predicates pushed down, even if target query joins df1 and df2. As far as I > understand Spark optimizer, if we use functions like `withAnalysisBarrier` > and put AnalysisBarrier explicitly in logical plan, then predicates won't be > pushed down on this particular DataFrames and PP will be still possible on > the second one. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24288) Enable preventing predicate pushdown
[ https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476985#comment-16476985 ] Maryann Xue commented on SPARK-24288: - The special operator would just look like {{sql("SELECT * FROM foobar").withOptimizerBarrier().where("THEID = 1")}} or {{spark.jdbc(...).withOptimizerBarrier().where("THEID = 1")}} I assume the hint would be easier to use while the barrier would be more general (regardless of data source types) and give more control to the user. If we were to implement other operator push down (e.g., agg, limit), the barrier would make more sense in terms of controlling up to which level things can be pushed down. I'm more inclined to do hint now if that covers [~TomaszGaweda]'s use case. > Enable preventing predicate pushdown > > > Key: SPARK-24288 > URL: https://issues.apache.org/jira/browse/SPARK-24288 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.3.0 >Reporter: Tomasz Gawęda >Priority: Major > > Issue discussed on Mailing List: > [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html] > While working with JDBC datasource I saw that many "or" clauses with > non-equality operators causes huge performance degradation of SQL query > to database (DB2). For example: > val df = spark.read.format("jdbc").(other options to parallelize > load).load() > df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x > > 100)").show() // in real application whose predicates were pushed > many many lines below, many ANDs and ORs > If I use cache() before where, there is no predicate pushdown of this > "where" clause. However, in production system caching many sources is a > waste of memory (especially is pipeline is long and I must do cache many > times).There are also few more workarounds, but it would be great if Spark > will support preventing predicate pushdown by user. > > For example: df.withAnalysisBarrier().where(...) ? > > Note, that this should not be a global configuration option. If I read 2 > DataFrames, df1 and df2, I would like to specify that df1 should not have > some predicates pushed down, but some may be, but df2 should have all > predicates pushed down, even if target query joins df1 and df2. As far as I > understand Spark optimizer, if we use functions like `withAnalysisBarrier` > and put AnalysisBarrier explicitly in logical plan, then predicates won't be > pushed down on this particular DataFrames and PP will be still possible on > the second one. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24288) Enable preventing predicate pushdown
[ https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476900#comment-16476900 ] Maryann Xue commented on SPARK-24288: - After taking a closer look at the problem, I realized that we should probably define the scope of the problem first. The issue [~TomaszGaweda] encountered specifically is predicates being pushed down to the data source; while a more general topic is the isolation of plan optimization/transformation between operators, somewhat like the effect "cache()" would achieve. 1) So for the first specific purpose, i.e., avoid predicate push-down to the data source, we could have an option when initiating the {{DataSource}}, e.g., by calling {{spark.read.option()}} or {{spark.jdbc()}} with options, etc. This might not cover all cases of DataSource creation, but would be a workable solution for users. 2) For the more general purpose (which means we can stop push-down at any level), we could add a special logical operator (let's call it OptimizerBarrier maybe) that serves as a barrier node when performing optimizations. Then we could simply ditch this node at the time of physical plan transformation. I personally prefer the second solution, which serves a more general purpose and may require less code changes (did a quick experiment and it worked). Please let me know your thoughts, [~smilegator], [~TomaszGaweda]. > Enable preventing predicate pushdown > > > Key: SPARK-24288 > URL: https://issues.apache.org/jira/browse/SPARK-24288 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.3.0 >Reporter: Tomasz Gawęda >Priority: Major > > Issue discussed on Mailing List: > [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html] > While working with JDBC datasource I saw that many "or" clauses with > non-equality operators causes huge performance degradation of SQL query > to database (DB2). For example: > val df = spark.read.format("jdbc").(other options to parallelize > load).load() > df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x > > 100)").show() // in real application whose predicates were pushed > many many lines below, many ANDs and ORs > If I use cache() before where, there is no predicate pushdown of this > "where" clause. However, in production system caching many sources is a > waste of memory (especially is pipeline is long and I must do cache many > times).There are also few more workarounds, but it would be great if Spark > will support preventing predicate pushdown by user. > > For example: df.withAnalysisBarrier().where(...) ? > > Note, that this should not be a global configuration option. If I read 2 > DataFrames, df1 and df2, I would like to specify that df1 should not have > some predicates pushed down, but some may be, but df2 should have all > predicates pushed down, even if target query joins df1 and df2. As far as I > understand Spark optimizer, if we use functions like `withAnalysisBarrier` > and put AnalysisBarrier explicitly in logical plan, then predicates won't be > pushed down on this particular DataFrames and PP will be still possible on > the second one. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24288) Enable preventing predicate pushdown
[ https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476372#comment-16476372 ] Maryann Xue commented on SPARK-24288: - I agree with you, [~TomaszGaweda], that making this a special operator would be a better way than adding a option. It would be finer-grained both in terms of different DataFrames and in terms of different operators (for example, in future, Spark is able to push down other operators to the data source). > Enable preventing predicate pushdown > > > Key: SPARK-24288 > URL: https://issues.apache.org/jira/browse/SPARK-24288 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.3.0 >Reporter: Tomasz Gawęda >Priority: Major > > Issue discussed on Mailing List: > [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html] > While working with JDBC datasource I saw that many "or" clauses with > non-equality operators causes huge performance degradation of SQL query > to database (DB2). For example: > val df = spark.read.format("jdbc").(other options to parallelize > load).load() > df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x > > 100)").show() // in real application whose predicates were pushed > many many lines below, many ANDs and ORs > If I use cache() before where, there is no predicate pushdown of this > "where" clause. However, in production system caching many sources is a > waste of memory (especially is pipeline is long and I must do cache many > times).There are also few more workarounds, but it would be great if Spark > will support preventing predicate pushdown by user. > > For example: df.withAnalysisBarrier().where(...) ? > > Note, that this should not be a global configuration option. If I read 2 > DataFrames, df1 and df2, I would like to specify that df1 should not have > some predicates pushed down, but some may be, but df2 should have all > predicates pushed down, even if target query joins df1 and df2. As far as I > understand Spark optimizer, if we use functions like `withAnalysisBarrier` > and put AnalysisBarrier explicitly in logical plan, then predicates won't be > pushed down on this particular DataFrames and PP will be still possible on > the second one. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24288) Enable preventing predicate pushdown
[ https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476368#comment-16476368 ] Maryann Xue commented on SPARK-24288: - Feel free to assign this to me then, [~smilegator]. > Enable preventing predicate pushdown > > > Key: SPARK-24288 > URL: https://issues.apache.org/jira/browse/SPARK-24288 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.3.0 >Reporter: Tomasz Gawęda >Priority: Major > > Issue discussed on Mailing List: > [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html] > While working with JDBC datasource I saw that many "or" clauses with > non-equality operators causes huge performance degradation of SQL query > to database (DB2). For example: > val df = spark.read.format("jdbc").(other options to parallelize > load).load() > df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x > > 100)").show() // in real application whose predicates were pushed > many many lines below, many ANDs and ORs > If I use cache() before where, there is no predicate pushdown of this > "where" clause. However, in production system caching many sources is a > waste of memory (especially is pipeline is long and I must do cache many > times).There are also few more workarounds, but it would be great if Spark > will support preventing predicate pushdown by user. > > For example: df.withAnalysisBarrier().where(...) ? > > Note, that this should not be a global configuration option. If I read 2 > DataFrames, df1 and df2, I would like to specify that df1 should not have > some predicates pushed down, but some may be, but df2 should have all > predicates pushed down, even if target query joins df1 and df2. As far as I > understand Spark optimizer, if we use functions like `withAnalysisBarrier` > and put AnalysisBarrier explicitly in logical plan, then predicates won't be > pushed down on this particular DataFrames and PP will be still possible on > the second one. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24164) Support column list as the pivot column in Pivot
Maryann Xue created SPARK-24164: --- Summary: Support column list as the pivot column in Pivot Key: SPARK-24164 URL: https://issues.apache.org/jira/browse/SPARK-24164 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: Maryann Xue This is part of a functionality extension to Pivot SQL support as SPARK-24035. Currently, we only support a single column as the pivot column, while a column list as the pivot column would look like: {code:java} SELECT * FROM ( SELECT year, course, earnings FROM courseSales ) PIVOT ( sum(earnings) FOR (course, year) IN (('dotNET', 2012), ('Java', 2013)) );{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24163) Support "ANY" or sub-query for Pivot "IN" clause
Maryann Xue created SPARK-24163: --- Summary: Support "ANY" or sub-query for Pivot "IN" clause Key: SPARK-24163 URL: https://issues.apache.org/jira/browse/SPARK-24163 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: Maryann Xue This is part of a functionality extension to Pivot SQL support as SPARK-24035. Currently, only literal values are allowed in Pivot "IN" clause. To support ANY or a sub-query in the "IN" clause (the examples of which provided below), we need to enable evaluation of a sub-query before/during query analysis time. {code:java} SELECT * FROM ( SELECT year, course, earnings FROM courseSales ) PIVOT ( sum(earnings) FOR course IN ANY );{code} {code:java} SELECT * FROM ( SELECT year, course, earnings FROM courseSales ) PIVOT ( sum(earnings) FOR course IN ( SELECT course FROM courses WHERE region = 'AZ' ) ); {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24162) Support aliased literal values for Pivot "IN" clause
Maryann Xue created SPARK-24162: --- Summary: Support aliased literal values for Pivot "IN" clause Key: SPARK-24162 URL: https://issues.apache.org/jira/browse/SPARK-24162 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: Maryann Xue This is part of a functionality extension to Pivot SQL support as SPARK-24035. When literal values are specified in Pivot IN clause, it would be nice to allow aliases for those values so that the output column names can be customized. For example: {code:java} SELECT * FROM ( SELECT year, course, earnings FROM courseSales ) PIVOT ( sum(earnings) FOR course IN ('dotNET' as c1, 'Java' as c2) );{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20169) Groupby Bug with Sparksql
[ https://issues.apache.org/jira/browse/SPARK-20169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16419808#comment-16419808 ] Maryann Xue commented on SPARK-20169: - [~smilegator], I think this is also caused by SPARK-23368, so could you please assign someone else to review https://github.com/apache/spark/pull/20613 if the original person still does not respond in a few days? > Groupby Bug with Sparksql > - > > Key: SPARK-20169 > URL: https://issues.apache.org/jira/browse/SPARK-20169 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.1.0 >Reporter: Bin Wu >Priority: Major > > We find a potential bug in Catalyst optimizer which cannot correctly > process "groupby". You can reproduce it by following simple example: > = > from pyspark.sql.functions import * > #e=sc.parallelize([(1,2),(1,3),(1,4),(2,1),(3,1),(4,1)]).toDF(["src","dst"]) > e = spark.read.csv("graph.csv", header=True) > r = sc.parallelize([(1,),(2,),(3,),(4,)]).toDF(['src']) > r1 = e.join(r, 'src').groupBy('dst').count().withColumnRenamed('dst','src') > jr = e.join(r1, 'src') > jr.show() > r2 = jr.groupBy('dst').count() > r2.show() > = > FYI, "graph.csv" contains exactly the same data as the commented line. > You can find that jr is: > |src|dst|count| > | 3| 1|1| > | 1| 4|3| > | 1| 3|3| > | 1| 2|3| > | 4| 1|1| > | 2| 1|1| > But, after the last groupBy, the 3 rows with dst = 1 are not grouped together: > |dst|count| > | 1|1| > | 4|1| > | 3|1| > | 2|1| > | 1|1| > | 1|1| > If we build jr directly from raw data (commented line), this error will not > show up. So > we suspect that there is a bug in the Catalyst optimizer when multiple joins > and groupBy's > are being optimized. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20169) Groupby Bug with Sparksql
[ https://issues.apache.org/jira/browse/SPARK-20169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418282#comment-16418282 ] Maryann Xue commented on SPARK-20169: - This was related to the use of "withColumnRenamed", which will add a "Project" on top of the original relation. And later on, when getting the "outputPartitioning" of the "Project", it returned HashPartitioning("dst") while it should return HashPartitioning("src") since it had been renamed. As a result, an "Exchange" node was missing between the two outmost HashAggregate nodes, and that's why the aggregate result was not correct. {code:java} test("SPARK-20169") { val e = Seq((1, 2), (1, 3), (1, 4), (2, 1), (3, 1), (4, 1)).toDF("src", "dst") val r = Seq((1), (2), (3), (4)).toDF("src") val r1 = e.join(r, "src" :: Nil).groupBy("dst").count().withColumnRenamed("dst", "src") val jr = e.join(r1, "src" :: Nil) val r2 = jr.groupBy("dst").count r2.explain() r2.show } {code} The physical plan resulted from this bug turned out to be {code} == Physical Plan == *(2) HashAggregate(keys=[dst#181], functions=[count(1)]) +- *(2) HashAggregate(keys=[dst#181], functions=[partial_count(1)]) +- *(2) Project [dst#181] +- *(2) BroadcastHashJoin [src#180], [src#197], Inner, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- LocalTableScan [src#180, dst#181] +- *(2) HashAggregate(keys=[dst#181], functions=[]) +- Exchange hashpartitioning(dst#181, 5) +- *(1) HashAggregate(keys=[dst#181], functions=[]) +- *(1) Project [dst#181] +- *(1) BroadcastHashJoin [src#180], [src#187], Inner, BuildRight :- LocalTableScan [src#180, dst#181] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- LocalTableScan [src#187] {code} The correct physical plan should be {code} == Physical Plan == *(3) HashAggregate(keys=[dst#181], functions=[count(1)]) +- Exchange hashpartitioning(dst#181, 5) +- *(2) HashAggregate(keys=[dst#181], functions=[partial_count(1)]) +- *(2) Project [dst#181] +- *(2) BroadcastHashJoin [src#180], [src#197], Inner, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- LocalTableScan [src#180, dst#181] +- *(2) HashAggregate(keys=[dst#181], functions=[]) +- Exchange hashpartitioning(dst#181, 5) +- *(1) HashAggregate(keys=[dst#181], functions=[]) +- *(1) Project [dst#181] +- *(1) BroadcastHashJoin [src#180], [src#187], Inner, BuildRight :- LocalTableScan [src#180, dst#181] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- LocalTableScan [src#187] {code} This happens to the same issue as SPARK-23368. The PR for SPARK-23368 fixes it. > Groupby Bug with Sparksql > - > > Key: SPARK-20169 > URL: https://issues.apache.org/jira/browse/SPARK-20169 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.1.0 >Reporter: Bin Wu >Priority: Major > > We find a potential bug in Catalyst optimizer which cannot correctly > process "groupby". You can reproduce it by following simple example: > = > from pyspark.sql.functions import * > #e=sc.parallelize([(1,2),(1,3),(1,4),(2,1),(3,1),(4,1)]).toDF(["src","dst"]) > e = spark.read.csv("graph.csv", header=True) > r = sc.parallelize([(1,),(2,),(3,),(4,)]).toDF(['src']) > r1 = e.join(r, 'src').groupBy('dst').count().withColumnRenamed('dst','src') > jr = e.join(r1, 'src') > jr.show() > r2 = jr.groupBy('dst').count() > r2.show() > = > FYI, "graph.csv" contains exactly the same data as the commented line. > You can find that jr is: > |src|dst|count| > | 3| 1|1| > | 1| 4|3| > | 1| 3|3| > | 1| 2|3| > | 4| 1|1| > | 2| 1|1| > But, after the last groupBy, the 3 rows with dst = 1 are not grouped together: > |dst|count| > | 1|1| > | 4|1| > | 3|1| > | 2|1| > | 1|1| > | 1|1| > If we build jr directly from raw data (commented line), this error will not > show up. So > we suspect that there is a bug in the Catalyst optimizer when multiple joins > and groupBy's > are being optimized. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail:
[jira] [Commented] (SPARK-23368) Avoid unnecessary Exchange or Sort after projection
[ https://issues.apache.org/jira/browse/SPARK-23368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366374#comment-16366374 ] Maryann Xue commented on SPARK-23368: - [~cloud_fan], [~smilegator], Could you please help review this PR? Thanks in advance! > Avoid unnecessary Exchange or Sort after projection > --- > > Key: SPARK-23368 > URL: https://issues.apache.org/jira/browse/SPARK-23368 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Maryann Xue >Priority: Minor > > After column rename projection, the ProjectExec's outputOrdering and > outputPartitioning should reflect the projected columns as well. For example, > {code:java} > SELECT b1 > FROM ( > SELECT a a1, b b1 > FROM testData2 > ORDER BY a > ) > ORDER BY a1{code} > The inner query is ordered on a1 as well. If we had a rule to eliminate Sort > on sorted result, together with this fix, the order-by in the outer query > could have been optimized out. > > Similarly, the below query > {code:java} > SELECT * > FROM ( > SELECT t1.a a1, t2.a a2, t1.b b1, t2.b b2 > FROM testData2 t1 > LEFT JOIN testData2 t2 > ON t1.a = t2.a > ) > JOIN testData2 t3 > ON a1 = t3.a{code} > is equivalent to > {code:java} > SELECT * > FROM testData2 t1 > LEFT JOIN testData2 t2 > ON t1.a = t2.a > JOIN testData2 t3 > ON t1.a = t3.a{code} > , so the unnecessary sorting and hash-partitioning that have been optimized > out for the second query should have be eliminated in the first query as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23368) Avoid unnecessary Exchange or Sort after projection
[ https://issues.apache.org/jira/browse/SPARK-23368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-23368: Summary: Avoid unnecessary Exchange or Sort after projection (was: OutputOrdering and OutputPartitioning in ProjectExec should reflect the projected columns) > Avoid unnecessary Exchange or Sort after projection > --- > > Key: SPARK-23368 > URL: https://issues.apache.org/jira/browse/SPARK-23368 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Maryann Xue >Priority: Minor > > After column rename projection, the ProjectExec's outputOrdering and > outputPartitioning should reflect the projected columns as well. For example, > {code:java} > SELECT b1 > FROM ( > SELECT a a1, b b1 > FROM testData2 > ORDER BY a > ) > ORDER BY a1{code} > The inner query is ordered on a1 as well. If we had a rule to eliminate Sort > on sorted result, together with this fix, the order-by in the outer query > could have been optimized out. > > Similarly, the below query > {code:java} > SELECT * > FROM ( > SELECT t1.a a1, t2.a a2, t1.b b1, t2.b b2 > FROM testData2 t1 > LEFT JOIN testData2 t2 > ON t1.a = t2.a > ) > JOIN testData2 t3 > ON a1 = t3.a{code} > is equivalent to > {code:java} > SELECT * > FROM testData2 t1 > LEFT JOIN testData2 t2 > ON t1.a = t2.a > JOIN testData2 t3 > ON t1.a = t3.a{code} > , so the unnecessary sorting and hash-partitioning that have been optimized > out for the second query should have be eliminated in the first query as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23368) OutputOrdering and OutputPartitioning in ProjectExec should reflect the projected columns
Maryann Xue created SPARK-23368: --- Summary: OutputOrdering and OutputPartitioning in ProjectExec should reflect the projected columns Key: SPARK-23368 URL: https://issues.apache.org/jira/browse/SPARK-23368 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: Maryann Xue After column rename projection, the ProjectExec's outputOrdering and outputPartitioning should reflect the projected columns as well. For example, {code:java} SELECT b1 FROM ( SELECT a a1, b b1 FROM testData2 ORDER BY a ) ORDER BY a1{code} The inner query is ordered on a1 as well. If we had a rule to eliminate Sort on sorted result, together with this fix, the order-by in the outer query could have been optimized out. Similarly, the below query {code:java} SELECT * FROM ( SELECT t1.a a1, t2.a a2, t1.b b1, t2.b b2 FROM testData2 t1 LEFT JOIN testData2 t2 ON t1.a = t2.a ) JOIN testData2 t3 ON a1 = t3.a{code} is equivalent to {code:java} SELECT * FROM testData2 t1 LEFT JOIN testData2 t2 ON t1.a = t2.a JOIN testData2 t3 ON t1.a = t3.a{code} , so the unnecessary sorting and hash-partitioning that have been optimized out for the second query should have be eliminated in the first query as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22266) The same aggregate function was evaluated multiple times
[ https://issues.apache.org/jira/browse/SPARK-22266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16203105#comment-16203105 ] Maryann Xue commented on SPARK-22266: - Thank you for the comment, [~maropu]! I think it is a problem of common subexpression elimination, but feel like this kind of CSE had better be performed high level rather than in code-gen. And PhysicalAggregation is designed to pull out the aggregate functions from result expressions so that later on at code-gen (and non-code-gen as well) stage HashAggregate can assume that all aggregate functions are purely aggregation and no other expressions. So if CSE in code-gen were to handle this, we would end up having extra non-aggregate expressions in "aggregate expression" list. Not sure if we should have a logical/physical optimization dedicated to CSE, but I think it would be nice. I applied a simple straightforward fix in the PR. Could you please review? Thank you in advance! > The same aggregate function was evaluated multiple times > > > Key: SPARK-22266 > URL: https://issues.apache.org/jira/browse/SPARK-22266 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Maryann Xue >Priority: Minor > > We should avoid the same aggregate function being evaluated more than once, > and this is what has been stated in the code comment below > (patterns.scala:206). However things didn't work as expected. > {code} > // A single aggregate expression might appear multiple times in > resultExpressions. > // In order to avoid evaluating an individual aggregate function > multiple times, we'll > // build a set of the distinct aggregate expressions and build a > function which can > // be used to re-write expressions so that they reference the single > copy of the > // aggregate function which actually gets computed. > {code} > For example, the physical plan of > {code} > SELECT a, max(b+1), max(b+1) + 1 FROM testData2 GROUP BY a > {code} > was > {code} > HashAggregate(keys=[a#23], functions=[max((b#24 + 1)), max((b#24 + 1))], > output=[a#23, max((b + 1))#223, (max((b + 1)) + 1)#224]) > +- HashAggregate(keys=[a#23], functions=[partial_max((b#24 + 1)), > partial_max((b#24 + 1))], output=[a#23, max#231, max#232]) >+- SerializeFromObject [assertnotnull(input[0, > org.apache.spark.sql.test.SQLTestData$TestData2, true]).a AS a#23, > assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, > true]).b AS b#24] > +- Scan ExternalRDDScan[obj#22] > {code} > , where in each HashAggregate there were two identical aggregate functions > "max(b#24 + 1)". -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22266) The same aggregate function was evaluated multiple times
Maryann Xue created SPARK-22266: --- Summary: The same aggregate function was evaluated multiple times Key: SPARK-22266 URL: https://issues.apache.org/jira/browse/SPARK-22266 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.2.0 Reporter: Maryann Xue Priority: Minor We should avoid the same aggregate function being evaluated more than once, and this is what has been stated in the code comment below (patterns.scala:206). However things didn't work as expected. {code} // A single aggregate expression might appear multiple times in resultExpressions. // In order to avoid evaluating an individual aggregate function multiple times, we'll // build a set of the distinct aggregate expressions and build a function which can // be used to re-write expressions so that they reference the single copy of the // aggregate function which actually gets computed. {code} For example, the physical plan of {code} SELECT a, max(b+1), max(b+1) + 1 FROM testData2 GROUP BY a {code} was {code} HashAggregate(keys=[a#23], functions=[max((b#24 + 1)), max((b#24 + 1))], output=[a#23, max((b + 1))#223, (max((b + 1)) + 1)#224]) +- HashAggregate(keys=[a#23], functions=[partial_max((b#24 + 1)), partial_max((b#24 + 1))], output=[a#23, max#231, max#232]) +- SerializeFromObject [assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true]).a AS a#23, assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true]).b AS b#24] +- Scan ExternalRDDScan[obj#22] {code} , where in each HashAggregate there were two identical aggregate functions "max(b#24 + 1)". -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21998) SortMergeJoinExec did not calculate its outputOrdering correctly during physical planning
[ https://issues.apache.org/jira/browse/SPARK-21998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16172169#comment-16172169 ] Maryann Xue commented on SPARK-21998: - Thanks again for your comment, [~maropu]! I changed the title and description of this JIRA accordingly and created a PR as https://github.com/apache/spark/pull/19281. Could you please take a look? > SortMergeJoinExec did not calculate its outputOrdering correctly during > physical planning > - > > Key: SPARK-21998 > URL: https://issues.apache.org/jira/browse/SPARK-21998 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Maryann Xue >Priority: Minor > > Right now the calculation of SortMergeJoinExec's outputOrdering relies on the > fact that its children have already been sorted on the join keys, while this > is often not true until EnsureRequirements has been applied. > {code} > /** >* For SMJ, child's output must have been sorted on key or expressions with > the same order as >* key, so we can get ordering for key from child's output ordering. >*/ > private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: > Seq[SortOrder]) > : Seq[SortOrder] = { > keys.zip(childOutputOrdering).map { case (key, childOrder) => > SortOrder(key, Ascending, childOrder.sameOrderExpressions + > childOrder.child - key) > } > } > {code} > Thus SortMergeJoinExec's outputOrdering is most likely not correct during the > physical planning stage, and as a result, potential physical optimizations > that rely on the required/output orderings, like SPARK-18591, will not work > for SortMergeJoinExec. > The right behavior of {{getKeyOrdering(keys, childOutputOrdering)}} should be: > 1. If the childOutputOrdering satisfies (is a superset of) the required child > ordering => childOutputOrdering > 2. Otherwise => required child ordering -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21998) SortMergeJoinExec did not calculate its outputOrdering correctly during physical planning
[ https://issues.apache.org/jira/browse/SPARK-21998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-21998: Description: Right now the calculation of SortMergeJoinExec's outputOrdering relies on the fact that its children have already been sorted on the join keys, while this is often not true until EnsureRequirements has been applied. {code} /** * For SMJ, child's output must have been sorted on key or expressions with the same order as * key, so we can get ordering for key from child's output ordering. */ private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: Seq[SortOrder]) : Seq[SortOrder] = { keys.zip(childOutputOrdering).map { case (key, childOrder) => SortOrder(key, Ascending, childOrder.sameOrderExpressions + childOrder.child - key) } } {code} Thus SortMergeJoinExec's outputOrdering is most likely not correct during the physical planning stage, and as a result, potential physical optimizations that rely on the required/output orderings, like SPARK-18591, will not work for SortMergeJoinExec. The right behavior of {{getKeyOrdering(keys, childOutputOrdering)}} should be: 1. If the childOutputOrdering satisfies (is a superset of) the required child ordering => childOutputOrdering 2. Otherwise => required child ordering was: Right now SortMergeJoinExec calculates its outputOrdering based on its children's outputOrdering, thus oftentimes the SortMergeJoinExec's outputOrdering is NOT correct until after EnsureRequirements, which happens at a rather late stage. As a result, potential optimizations that rely on the required/output orderings, like SPARK-18591, will not work for SortMergeJoinExec. Unlike operators like Project or Filter, which simply preserve the ordering of their inputs, the SortMergeJoinExec has a behavior that generates a new ordering in its output regardless of the orderings of its children. I think the code below together with its comment is buggy. {code} /** * For SMJ, child's output must have been sorted on key or expressions with the same order as * key, so we can get ordering for key from child's output ordering. */ private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: Seq[SortOrder]) : Seq[SortOrder] = { keys.zip(childOutputOrdering).map { case (key, childOrder) => SortOrder(key, Ascending, childOrder.sameOrderExpressions + childOrder.child - key) } } {code} > SortMergeJoinExec did not calculate its outputOrdering correctly during > physical planning > - > > Key: SPARK-21998 > URL: https://issues.apache.org/jira/browse/SPARK-21998 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Maryann Xue >Priority: Minor > > Right now the calculation of SortMergeJoinExec's outputOrdering relies on the > fact that its children have already been sorted on the join keys, while this > is often not true until EnsureRequirements has been applied. > {code} > /** >* For SMJ, child's output must have been sorted on key or expressions with > the same order as >* key, so we can get ordering for key from child's output ordering. >*/ > private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: > Seq[SortOrder]) > : Seq[SortOrder] = { > keys.zip(childOutputOrdering).map { case (key, childOrder) => > SortOrder(key, Ascending, childOrder.sameOrderExpressions + > childOrder.child - key) > } > } > {code} > Thus SortMergeJoinExec's outputOrdering is most likely not correct during the > physical planning stage, and as a result, potential physical optimizations > that rely on the required/output orderings, like SPARK-18591, will not work > for SortMergeJoinExec. > The right behavior of {{getKeyOrdering(keys, childOutputOrdering)}} should be: > 1. If the childOutputOrdering satisfies (is a superset of) the required child > ordering => childOutputOrdering > 2. Otherwise => required child ordering -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21998) SortMergeJoinExec did not calculate its outputOrdering correctly during physical planning
[ https://issues.apache.org/jira/browse/SPARK-21998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-21998: Summary: SortMergeJoinExec did not calculate its outputOrdering correctly during physical planning (was: SortMergeJoinExec should calculate its outputOrdering independent of its children's outputOrdering) > SortMergeJoinExec did not calculate its outputOrdering correctly during > physical planning > - > > Key: SPARK-21998 > URL: https://issues.apache.org/jira/browse/SPARK-21998 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Maryann Xue >Priority: Minor > > Right now SortMergeJoinExec calculates its outputOrdering based on its > children's outputOrdering, thus oftentimes the SortMergeJoinExec's > outputOrdering is NOT correct until after EnsureRequirements, which happens > at a rather late stage. As a result, potential optimizations that rely on the > required/output orderings, like SPARK-18591, will not work for > SortMergeJoinExec. > Unlike operators like Project or Filter, which simply preserve the ordering > of their inputs, the SortMergeJoinExec has a behavior that generates a new > ordering in its output regardless of the orderings of its children. I think > the code below together with its comment is buggy. > {code} > /** >* For SMJ, child's output must have been sorted on key or expressions with > the same order as >* key, so we can get ordering for key from child's output ordering. >*/ > private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: > Seq[SortOrder]) > : Seq[SortOrder] = { > keys.zip(childOutputOrdering).map { case (key, childOrder) => > SortOrder(key, Ascending, childOrder.sameOrderExpressions + > childOrder.child - key) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21998) SortMergeJoinExec should calculate its outputOrdering independent of its children's outputOrdering
[ https://issues.apache.org/jira/browse/SPARK-21998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16165765#comment-16165765 ] Maryann Xue commented on SPARK-21998: - Thank you for pointing this out, [~maropu]! Yes, you are right. It would be more accurate to say that the outputOrdering of SortMergeJoinExec should be a superset of (i.e., satisfy) the join key ordering. Let's look at {{SELECT * FROM t1 JOIN t2 ON t1.a = t2.z}} for example and focus on {{t1}} for simplicity, 1) If t1 is not sorted on a, the SMJ outputOrdering should be (a, ASC) 2) If t1 is sorted on a, the SMJ outputOrdering should be (a, ASC) 3) If t1 is sorted both on a and b, the SMJ outputOrdering should at least include (a, ASC), and whether the SMJ output is still sorted on b remains implementation specific, and in current Spark SQL implementation it surely is. But we can imagine a less smart SMJ implementation which cannot avoid extra sorting on t1 and meanwhile uses an unstable sorting can destroy the sortedness of column b. Anyway, my point is the current SMJ implementation will return NOTHING as outputOrdering when the child ordering is not immediately available and can only return the right value at a later stage after EnsureRequirements happens. You can try my query example in https://issues.apache.org/jira/browse/SPARK-18591?focusedCommentId=16165148=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16165148 with your proposed change for SPARK-18591 and see what's going on. > SortMergeJoinExec should calculate its outputOrdering independent of its > children's outputOrdering > -- > > Key: SPARK-21998 > URL: https://issues.apache.org/jira/browse/SPARK-21998 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Maryann Xue >Priority: Minor > > Right now SortMergeJoinExec calculates its outputOrdering based on its > children's outputOrdering, thus oftentimes the SortMergeJoinExec's > outputOrdering is NOT correct until after EnsureRequirements, which happens > at a rather late stage. As a result, potential optimizations that rely on the > required/output orderings, like SPARK-18591, will not work for > SortMergeJoinExec. > Unlike operators like Project or Filter, which simply preserve the ordering > of their inputs, the SortMergeJoinExec has a behavior that generates a new > ordering in its output regardless of the orderings of its children. I think > the code below together with its comment is buggy. > {code} > /** >* For SMJ, child's output must have been sorted on key or expressions with > the same order as >* key, so we can get ordering for key from child's output ordering. >*/ > private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: > Seq[SortOrder]) > : Seq[SortOrder] = { > keys.zip(childOutputOrdering).map { case (key, childOrder) => > SortOrder(key, Ascending, childOrder.sameOrderExpressions + > childOrder.child - key) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18591) Replace hash-based aggregates with sort-based ones if inputs already sorted
[ https://issues.apache.org/jira/browse/SPARK-18591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16165148#comment-16165148 ] Maryann Xue commented on SPARK-18591: - Hey, I came up with this idea of doing sort-aggregate on sorted input while debugging some performance issue, and it was before I saw this JIRA. I tried implementing it and found out that I couldn't do it in physical planning because it was top-down, so I ended up doing the same thing as https://github.com/maropu/spark/commit/32b716cf02dfe8cba5b08b2dc3297bc061156630#diff-7d06cf071190dcbeda2fed6b039ec5d0R55. I totally agree with [~maropu] that we need to make the physical planning bottom-up and then we can solve this in a better way. But aside from this, I found another issue with a slightly more sophisticated query example: {{SELECT t1.a, count\(*\) FROM t1 JOIN t2 ON t1.a = t2.b GROUP BY t1.a}} This would only work if we put the {{ReplaceSortAggregate}} rule after {{EnsureRequirements}} because SortMergeJoinExec's outputOrdering is not correct before EnsureRequirements happens. Please refer to SPARK-21998 for details. > Replace hash-based aggregates with sort-based ones if inputs already sorted > --- > > Key: SPARK-18591 > URL: https://issues.apache.org/jira/browse/SPARK-18591 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.2 >Reporter: Takeshi Yamamuro > > Spark currently uses sort-based aggregates only in limited condition; the > cases where spark cannot use partial aggregates and hash-based ones. > However, if input ordering has already satisfied the requirements of > sort-based aggregates, it seems sort-based ones are faster than the other. > {code} > ./bin/spark-shell --conf spark.sql.shuffle.partitions=1 > val df = spark.range(1000).selectExpr("id AS key", "id % 10 AS > value").sort($"key").cache > def timer[R](block: => R): R = { > val t0 = System.nanoTime() > val result = block > val t1 = System.nanoTime() > println("Elapsed time: " + ((t1 - t0 + 0.0) / 10.0)+ "s") > result > } > timer { > df.groupBy("key").count().count > } > // codegen'd hash aggregate > Elapsed time: 7.116962977s > // non-codegen'd sort aggregarte > Elapsed time: 3.088816662s > {code} > If codegen'd sort-based aggregates are supported in SPARK-16844, this seems > to make the performance gap bigger; > {code} > - codegen'd sort aggregate > Elapsed time: 1.645234684s > {code} > Therefore, it'd be better to use sort-based ones in this case. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21998) SortMergeJoinExec should calculate its outputOrdering independent of its children's outputOrdering
Maryann Xue created SPARK-21998: --- Summary: SortMergeJoinExec should calculate its outputOrdering independent of its children's outputOrdering Key: SPARK-21998 URL: https://issues.apache.org/jira/browse/SPARK-21998 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.2.0 Reporter: Maryann Xue Priority: Minor Right now SortMergeJoinExec calculates its outputOrdering based on its children's outputOrdering, thus oftentimes the SortMergeJoinExec's outputOrdering is NOT correct until after EnsureRequirements, which happens at a rather late stage. As a result, potential optimizations that rely on the required/output orderings, like SPARK-18591, will not work for SortMergeJoinExec. Unlike operators like Project or Filter, which simply preserve the ordering of their inputs, the SortMergeJoinExec has a behavior that generates a new ordering in its output regardless of the orderings of its children. I think the code below together with its comment is buggy. {code} /** * For SMJ, child's output must have been sorted on key or expressions with the same order as * key, so we can get ordering for key from child's output ordering. */ private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: Seq[SortOrder]) : Seq[SortOrder] = { keys.zip(childOutputOrdering).map { case (key, childOrder) => SortOrder(key, Ascending, childOrder.sameOrderExpressions + childOrder.child - key) } } {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org