[jira] [Created] (SPARK-29060) Add tree traversal helper for adaptive spark plans

2019-09-11 Thread Maryann Xue (Jira)
Maryann Xue created SPARK-29060:
---

 Summary: Add tree traversal helper for adaptive spark plans
 Key: SPARK-29060
 URL: https://issues.apache.org/jira/browse/SPARK-29060
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Maryann Xue






--
This message was sent by Atlassian Jira
(v8.3.2#803003)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-29002) Avoid changing SMJ to BHJ if the build side has a high ratio of empty partitions

2019-09-05 Thread Maryann Xue (Jira)
Maryann Xue created SPARK-29002:
---

 Summary: Avoid changing SMJ to BHJ if the build side has a high 
ratio of empty partitions
 Key: SPARK-29002
 URL: https://issues.apache.org/jira/browse/SPARK-29002
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Maryann Xue






--
This message was sent by Atlassian Jira
(v8.3.2#803003)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28959) Support `EqualNullSafe` as join condition in Dynamic Partition Pruning

2019-09-03 Thread Maryann Xue (Jira)
Maryann Xue created SPARK-28959:
---

 Summary: Support `EqualNullSafe` as join condition in Dynamic 
Partition Pruning
 Key: SPARK-28959
 URL: https://issues.apache.org/jira/browse/SPARK-28959
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Maryann Xue


See [https://github.com/apache/spark/pull/25600#discussion_r318415489].



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28888) Implement Dynamic Partition Pruning

2019-08-27 Thread Maryann Xue (Jira)
Maryann Xue created SPARK-2:
---

 Summary: Implement Dynamic Partition Pruning
 Key: SPARK-2
 URL: https://issues.apache.org/jira/browse/SPARK-2
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 3.0.0
Reporter: Maryann Xue






--
This message was sent by Atlassian Jira
(v8.3.2#803003)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28753) Dynamically reuse subqueries in AQE

2019-08-15 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-28753:
---

 Summary: Dynamically reuse subqueries in AQE
 Key: SPARK-28753
 URL: https://issues.apache.org/jira/browse/SPARK-28753
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Maryann Xue






--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28583) Subqueries should not call `onUpdatePlan` in Adaptive Query Execution

2019-07-31 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-28583:
---

 Summary: Subqueries should not call `onUpdatePlan` in Adaptive 
Query Execution
 Key: SPARK-28583
 URL: https://issues.apache.org/jira/browse/SPARK-28583
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0
Reporter: Maryann Xue


Subqueries do not have their own execution id, thus when calling 
{{AdaptiveSparkPlanExec.onUpdatePlan}}, it will actually get the 
{{QueryExecution}} instance of the main query, which is wasteful and 
problematic. It could cause issues like stack overflow or dead locks in some 
circumstances.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28057) Add method `clone` in catalyst TreeNode

2019-06-14 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-28057:
---

 Summary: Add method `clone` in catalyst TreeNode
 Key: SPARK-28057
 URL: https://issues.apache.org/jira/browse/SPARK-28057
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Maryann Xue


Add implementation for {{clone}} method in {{TreeNode}}, for de-duplicating 
instances in the LogicalPlan tree. This is a prerequisite for SPARK-23128.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27824) Make rule EliminateResolvedHint idempotent

2019-05-23 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-27824:
---

 Summary: Make rule EliminateResolvedHint idempotent
 Key: SPARK-27824
 URL: https://issues.apache.org/jira/browse/SPARK-27824
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0
Reporter: Maryann Xue


This is make sure we get the correct outcome even if the 
{{EliminateResolveHint}} rule gets applied more than once.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27783) Add customizable hint error handler

2019-05-20 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-27783:
---

 Summary: Add customizable hint error handler
 Key: SPARK-27783
 URL: https://issues.apache.org/jira/browse/SPARK-27783
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Maryann Xue


Add a customizable hint error handler, with default behavior as logging 
warnings.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27236) Refactor log-appender pattern in tests

2019-03-21 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-27236:
---

 Summary: Refactor log-appender pattern in tests
 Key: SPARK-27236
 URL: https://issues.apache.org/jira/browse/SPARK-27236
 Project: Spark
  Issue Type: Improvement
  Components: Tests
Affects Versions: 2.4.0
Reporter: Maryann Xue


There are a few places in tests where similar patterns of "withLogAppender()" 
are used. This seems a common requirement across tests, so can be made into a 
utility method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27225) Implement join strategy hints

2019-03-20 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-27225:
---

 Summary: Implement join strategy hints
 Key: SPARK-27225
 URL: https://issues.apache.org/jira/browse/SPARK-27225
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 2.4.0
Reporter: Maryann Xue


Extend the existing BROADCAST join hint by implementing other join strategy 
hints corresponding to the rest of Spark's existing join strategies: 
shuffle-hash, sort-merge, cartesian-product. Broadcast-nested-loop will use 
BROADCAST hint as it does now.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27223) Remove private methods that skip conversion when passing user schemas for constructing a DataFrame

2019-03-20 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-27223:

Summary: Remove private methods that skip conversion when passing user 
schemas for constructing a DataFrame  (was: Remove private methods that allow 
no conversion when passing user schemas for constructing a DataFrame)

> Remove private methods that skip conversion when passing user schemas for 
> constructing a DataFrame
> --
>
> Key: SPARK-27223
> URL: https://issues.apache.org/jira/browse/SPARK-27223
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Maryann Xue
>Priority: Minor
>
> When passing in a user schema to create a DataFrame, there might be 
> mismatched nullability between the user schema and the the actual data. All 
> related public interfaces now perform catalyst conversion using the user 
> provided schema, which catches such mismatches to avoid runtime errors later 
> on. However, there're private methods which allow this conversion to be 
> skipped, so we need to remove these private methods which may lead to 
> confusion and potential issues.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27223) Remove private methods that allow no conversion when passing user schemas for constructing a DataFrame

2019-03-20 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-27223:
---

 Summary: Remove private methods that allow no conversion when 
passing user schemas for constructing a DataFrame
 Key: SPARK-27223
 URL: https://issues.apache.org/jira/browse/SPARK-27223
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.0
Reporter: Maryann Xue


When passing in a user schema to create a DataFrame, there might be mismatched 
nullability between the user schema and the the actual data. All related public 
interfaces now perform catalyst conversion using the user provided schema, 
which catches such mismatches to avoid runtime errors later on. However, 
there're private methods which allow this conversion to be skipped, so we need 
to remove these private methods which may lead to confusion and potential 
issues.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27088) Apply conf "spark.sql.optimizer.planChangeLog.level" to batch plan change in RuleExecutor

2019-03-07 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-27088:
---

 Summary: Apply conf "spark.sql.optimizer.planChangeLog.level" to 
batch plan change in RuleExecutor
 Key: SPARK-27088
 URL: https://issues.apache.org/jira/browse/SPARK-27088
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Maryann Xue


Similar to SPARK-25415, which has made log level for plan changes by each rule 
configurable, we can make log level for plan changes by each batch configurable 
too and can reuse the same configuration: 
"spark.sql.optimizer.planChangeLog.level".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26840) Avoid cost-based join reorder in presence of join hints

2019-02-10 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-26840:

Description: This is a fix for 
[https://github.com/apache/spark/pull/23524|https://github.com/apache/spark/pull/23524.],
 which did not stop cost-based join reorder when the {{CostBasedJoinReorder}} 
rule recurses down the tree and applies join reorder for nested joins with 
hints.

> Avoid cost-based join reorder in presence of join hints
> ---
>
> Key: SPARK-26840
> URL: https://issues.apache.org/jira/browse/SPARK-26840
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Maryann Xue
>Priority: Minor
>
> This is a fix for 
> [https://github.com/apache/spark/pull/23524|https://github.com/apache/spark/pull/23524.],
>  which did not stop cost-based join reorder when the {{CostBasedJoinReorder}} 
> rule recurses down the tree and applies join reorder for nested joins with 
> hints.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26840) Avoid cost-based join reorder in presence of join hints

2019-02-06 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-26840:
---

 Summary: Avoid cost-based join reorder in presence of join hints
 Key: SPARK-26840
 URL: https://issues.apache.org/jira/browse/SPARK-26840
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0
Reporter: Maryann Xue






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26065) Change query hint from a `LogicalPlan` to a field

2018-11-14 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-26065:
---

 Summary: Change query hint from a `LogicalPlan` to a field
 Key: SPARK-26065
 URL: https://issues.apache.org/jira/browse/SPARK-26065
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Maryann Xue


The existing query hint implementation relies on a logical plan node 
{{ResolvedHint}} to store query hints in logical plans, and on {{Statistics}} 
in physical plans. Since {{ResolvedHint}} is not really a logical operator and 
can break the pattern matching for existing and future optimization rules, it 
is a issue to the Optimizer as the old {{AnalysisBarrier}} to the Analyzer.

Given the fact that all our query hints are either 1) a join hint, i.e., 
broadcast hint; or 2) a re-partition hint, which is indeed an operator, we only 
need to add a hint field on the {{Join}} plan and that will be a good enough 
solution for current hint usage.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25914) Separate projection from grouping and aggregate in logical Aggregate

2018-11-01 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-25914:

Description: 
Currently the Spark SQL logical Aggregate has two expression fields: 
{{groupingExpressions}} and {{aggregateExpressions}}, in which 
{{aggregateExpressions}} is actually the result expressions, or in other words, 
the project list in the SELECT clause.
  
 This would cause an exception while processing the following query:
{code:java}
SELECT concat('x', concat(a, 's'))
FROM testData2
GROUP BY concat(a, 's'){code}
 After optimization, the query becomes:
{code:java}
SELECT concat('x', a, 's')
FROM testData2
GROUP BY concat(a, 's'){code}
The optimization rule {{CombineConcats}} optimizes the expressions by 
flattening "concat" and causes the query to fail since the expression 
{{concat('x', a, 's')}} in the SELECT clause is neither referencing a grouping 
expression nor a aggregate expression.
  
 The problem is that we try to mix two operations in one operator, and worse, 
in one field: the group-and-aggregate operation and the project operation. 
There are two ways to solve this problem:
 1. Break the two operations into two logical operators, which means a group-by 
query can usually be mapped into a Project-over-Aggregate pattern.
 2. Break the two operations into multiple fields in the Aggregate operator, 
the same way we do for physical aggregate classes (e.g., {{HashAggregateExec}}, 
or {{SortAggregateExec}}). Thus, {{groupingExpressions}} would still be the 
expressions from the GROUP BY clause (as before), but {{aggregateExpressions}} 
would contain aggregate functions only, and {{resultExpressions}} would be the 
project list in the SELECT clause holding references to either 
{{groupingExpressions}} or {{aggregateExpressions}}.
  
 I would say option 1 is even clearer, but it would be more likely to break the 
pattern matching in existing optimization rules and thus require more changes 
in the compiler. So we'd probably wanna go with option 2. That said, I suggest 
we achieve this goal through two iterative steps:
  
 Phase 1: Keep the current fields of logical Aggregate as 
{{groupingExpressions}} and {{aggregateExpressions}}, but change the semantics 
of {{aggregateExpressions}} by replacing the grouping expressions with 
corresponding references to expressions in {{groupingExpressions}}. The 
aggregate expressions in  {{aggregateExpressions}} will remain the same.
  
 Phase 2: Add {{resultExpressions}} for the project list, and keep only 
aggregate expressions in {{aggregateExpressions}}.
  

  was:
Currently the Spark SQL logical Aggregate has two expression fields: 
{{groupingExpressions}} and {{aggregateExpressions}}, in which 
{{aggregateExpressions}} is actually the result expressions, or in other words, 
the project list in the SELECT clause.
  
 This would cause an exception while processing the following query:
{code:java}
SELECT concat('x', concat(a, 's'))
FROM testData2
GROUP BY concat(a, 's'){code}
 After optimization, the query becomes:
{code:java}
SELECT concat('x', a, 's')
FROM testData2
GROUP BY concat(a, 's'){code}
The optimization rule {{CombineConcats}} optimizes the expressions by 
flattening "concat" and causes the query to fail since the expression 
{{concat('x', a, 's')}} in the SELECT clause is neither referencing a grouping 
expression nor a aggregate expression.
  
 The problem is that we try to mix two operations in one operator, and worse, 
in one field: the group-and-aggregate operation and the project operation. 
There are two ways to solve this problem:
 1. Break the two operations into two logical operators, which means a group-by 
query can usually to be mapped into a Project-over-Aggregate pattern.
 2. Break the two operations into multiple fields in the Aggregate operator, 
the same way we do for physical aggregate classes (e.g., {{HashAggregateExec}}, 
or {{SortAggregateExec}}). Thus, {{groupingExpressions}} would still be the 
expressions from the GROUP BY clause (as before), but {{aggregateExpressions}} 
would contain aggregate functions only, and {{resultExpressions}} would be the 
project list in the SELECT clause holding references to either 
{{groupingExpressions}} or {{aggregateExpressions}}.
  
 I would say option 1 is even clearer, but it would be more likely to break the 
pattern matching in existing optimization rules and thus require more changes 
in the compiler. So we'd probably wanna go with option 2. That said, I suggest 
we achieve this goal through two iterative steps:
  
 Phase 1: Keep the current fields of logical Aggregate as 
{{groupingExpressions}} and {{aggregateExpressions}}, but change the semantics 
of {{aggregateExpressions}} by replacing the grouping expressions with 
corresponding references to expressions in {{groupingExpressions}}. The 
aggregate expressions in  {{aggregateExpressions}} will remain the same.
  
 Phase 

[jira] [Created] (SPARK-25916) Add `resultExpressions` in logical `Aggregate`

2018-11-01 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-25916:
---

 Summary: Add `resultExpressions` in logical `Aggregate`
 Key: SPARK-25916
 URL: https://issues.apache.org/jira/browse/SPARK-25916
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.4.0
Reporter: Maryann Xue


See parent Jira description



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25915) Replace grouping expressions with references in `aggregateExpressions` of logical `Aggregate`

2018-11-01 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-25915:
---

 Summary: Replace grouping expressions with references in 
`aggregateExpressions` of logical `Aggregate`
 Key: SPARK-25915
 URL: https://issues.apache.org/jira/browse/SPARK-25915
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.4.0
Reporter: Maryann Xue


See parent Jira description.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25914) Separate projection from grouping and aggregate in logical Aggregate

2018-11-01 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-25914:

Description: 
Currently the Spark SQL logical Aggregate has two expression fields: 
{{groupingExpressions}} and {{aggregateExpressions}}, in which 
{{aggregateExpressions}} is actually the result expressions, or in other words, 
the project list in the SELECT clause.
  
 This would cause an exception while processing the following query:
{code:java}
SELECT concat('x', concat(a, 's'))
FROM testData2
GROUP BY concat(a, 's'){code}
 After optimization, the query becomes:
{code:java}
SELECT concat('x', a, 's')
FROM testData2
GROUP BY concat(a, 's'){code}
The optimization rule {{CombineConcats}} optimizes the expressions by 
flattening "concat" and causes the query to fail since the expression 
{{concat('x', a, 's')}} in the SELECT clause is neither referencing a grouping 
expression nor a aggregate expression.
  
 The problem is that we try to mix two operations in one operator, and worse, 
in one field: the group-and-aggregate operation and the project operation. 
There are two ways to solve this problem:
 1. Break the two operations into two logical operators, which means a group-by 
query can usually to be mapped into a Project-over-Aggregate pattern.
 2. Break the two operations into multiple fields in the Aggregate operator, 
the same way we do for physical aggregate classes (e.g., {{HashAggregateExec}}, 
or {{SortAggregateExec}}). Thus, {{groupingExpressions}} would still be the 
expressions from the GROUP BY clause (as before), but {{aggregateExpressions}} 
would contain aggregate functions only, and {{resultExpressions}} would be the 
project list in the SELECT clause holding references to either 
{{groupingExpressions}} or {{aggregateExpressions}}.
  
 I would say option 1 is even clearer, but it would be more likely to break the 
pattern matching in existing optimization rules and thus require more changes 
in the compiler. So we'd probably wanna go with option 2. That said, I suggest 
we achieve this goal through two iterative steps:
  
 Phase 1: Keep the current fields of logical Aggregate as 
{{groupingExpressions}} and {{aggregateExpressions}}, but change the semantics 
of {{aggregateExpressions}} by replacing the grouping expressions with 
corresponding references to expressions in {{groupingExpressions}}. The 
aggregate expressions in  {{aggregateExpressions}} will remain the same.
  
 Phase 2: Add {{resultExpressions}} for the project list, and keep only 
aggregate expressions in {{aggregateExpressions}}.
  

  was:
Currently the Spark SQL logical Aggregate has two expression fields: 
{{groupingExpressions}} and {{aggregateExpressions}}, in which 
{{aggregateExpressions}} is actually the result expressions, or in other words, 
the project list in the SELECT clause.
 
This would cause an exception while processing the following query:
 
  SELECT concat('x', concat(a, 's'))
  FROM testData2
  GROUP BY concat(a, 's')
 
After optimization, the query becomes:
 
  SELECT concat('x', a, 's')
  FROM testData2
  GROUP BY concat(a, 's')
 
The optimization rule {{CombineConcats}} optimizes the expressions by 
flattening "concat" and causes the query to fail since the expression 
{{concat('x', a, 's')}} in the SELECT clause is neither referencing a grouping 
expression nor a aggregate expression.
 
The problem is that we try to mix two operations in one operator, and worse, in 
one field: the group-and-aggregate operation and the project operation. There 
are two ways to solve this problem:
1. Break the two operations into two logical operators, which means a group-by 
query can usually to be mapped into a Project-over-Aggregate pattern.
2. Break the two operations into multiple fields in the Aggregate operator, the 
same way we do for physical aggregate classes (e.g., {{HashAggregateExec}}, or 
{{SortAggregateExec}}). Thus, {{groupingExpressions}} would still be the 
expressions from the GROUP BY clause (as before), but {{aggregateExpressions}} 
would contain aggregate functions only, and {{resultExpressions}} would be the 
project list in the SELECT clause holding references to either 
{{groupingExpressions}} or {{aggregateExpressions}}.
 
I would say option 1 is even clearer, but it would be more likely to break the 
pattern matching in existing optimization rules and thus require more changes 
in the compiler. So we'd probably wanna go with option 2. That said, I suggest 
we achieve this goal through two iterative steps:
 
Phase 1: Keep the current fields of logical Aggregate as 
{{groupingExpressions}} and {{aggregateExpressions}}, but change the semantics 
of {{aggregateExpressions}} by replacing the grouping expressions with 
corresponding references to expressions in {{groupingExpressions}}. The 
aggregate expressions in  {{aggregateExpressions}} will remain the same.
 
Phase 2: Add 

[jira] [Created] (SPARK-25914) Separate projection from grouping and aggregate in logical Aggregate

2018-11-01 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-25914:
---

 Summary: Separate projection from grouping and aggregate in 
logical Aggregate
 Key: SPARK-25914
 URL: https://issues.apache.org/jira/browse/SPARK-25914
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.0
Reporter: Maryann Xue


Currently the Spark SQL logical Aggregate has two expression fields: 
{{groupingExpressions}} and {{aggregateExpressions}}, in which 
{{aggregateExpressions}} is actually the result expressions, or in other words, 
the project list in the SELECT clause.
 
This would cause an exception while processing the following query:
 
  SELECT concat('x', concat(a, 's'))
  FROM testData2
  GROUP BY concat(a, 's')
 
After optimization, the query becomes:
 
  SELECT concat('x', a, 's')
  FROM testData2
  GROUP BY concat(a, 's')
 
The optimization rule {{CombineConcats}} optimizes the expressions by 
flattening "concat" and causes the query to fail since the expression 
{{concat('x', a, 's')}} in the SELECT clause is neither referencing a grouping 
expression nor a aggregate expression.
 
The problem is that we try to mix two operations in one operator, and worse, in 
one field: the group-and-aggregate operation and the project operation. There 
are two ways to solve this problem:
1. Break the two operations into two logical operators, which means a group-by 
query can usually to be mapped into a Project-over-Aggregate pattern.
2. Break the two operations into multiple fields in the Aggregate operator, the 
same way we do for physical aggregate classes (e.g., {{HashAggregateExec}}, or 
{{SortAggregateExec}}). Thus, {{groupingExpressions}} would still be the 
expressions from the GROUP BY clause (as before), but {{aggregateExpressions}} 
would contain aggregate functions only, and {{resultExpressions}} would be the 
project list in the SELECT clause holding references to either 
{{groupingExpressions}} or {{aggregateExpressions}}.
 
I would say option 1 is even clearer, but it would be more likely to break the 
pattern matching in existing optimization rules and thus require more changes 
in the compiler. So we'd probably wanna go with option 2. That said, I suggest 
we achieve this goal through two iterative steps:
 
Phase 1: Keep the current fields of logical Aggregate as 
{{groupingExpressions}} and {{aggregateExpressions}}, but change the semantics 
of {{aggregateExpressions}} by replacing the grouping expressions with 
corresponding references to expressions in {{groupingExpressions}}. The 
aggregate expressions in  {{aggregateExpressions}} will remain the same.
 
Phase 2: Add {{resultExpressions}} for the project list, and keep only 
aggregate expressions in {{aggregateExpressions}}.
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25690) Analyzer rule "HandleNullInputsForUDF" does not stabilize and can be applied infinitely

2018-10-11 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-25690:

Description: 
This was fixed in SPARK-24891 and was then broken by SPARK-25044.

The unit test in {{AnalysisSuite}} added in SPARK-24891 should have failed but 
didn't because it wasn't properly updated after the {{ScalaUDF}} constructor 
signature change. In the meantime, the other two end-to-end tests added in 
SPARK-24891 were shadowed by SPARK-24865.

So the unit test mentioned above, if updated properly, can reproduce this issue:
{code:java}
test("SPARK-24891 Fix HandleNullInputsForUDF rule") {
  val a = testRelation.output(0)
  val func = (x: Int, y: Int) => x + y
  val udf1 = ScalaUDF(func, IntegerType, a :: a :: Nil, nullableTypes = false 
:: false :: Nil)
  val udf2 = ScalaUDF(func, IntegerType, a :: udf1 :: Nil, nullableTypes = 
false :: false :: Nil)
  val plan = Project(Alias(udf2, "")() :: Nil, testRelation)
  comparePlans(plan.analyze, plan.analyze.analyze)
}{code}

  was:
This was fixed in SPARK-24891 and was then broken by SPARK-25044.

The tests added in SPARK-24891 were not good enough and the expected failures 
were shadowed by SPARK-24865. For more details, please refer to SPARK-25650. 
Code changes and tests in 
[https://github.com/apache/spark/pull/22060/files#diff-f70523b948b7af21abddfa3ab7e1d7d6R72]
 can help reproduce the issue.


> Analyzer rule "HandleNullInputsForUDF" does not stabilize and can be applied 
> infinitely
> ---
>
> Key: SPARK-25690
> URL: https://issues.apache.org/jira/browse/SPARK-25690
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core, SQL
>Affects Versions: 2.4.0
>Reporter: Maryann Xue
>Priority: Major
>
> This was fixed in SPARK-24891 and was then broken by SPARK-25044.
> The unit test in {{AnalysisSuite}} added in SPARK-24891 should have failed 
> but didn't because it wasn't properly updated after the {{ScalaUDF}} 
> constructor signature change. In the meantime, the other two end-to-end tests 
> added in SPARK-24891 were shadowed by SPARK-24865.
> So the unit test mentioned above, if updated properly, can reproduce this 
> issue:
> {code:java}
> test("SPARK-24891 Fix HandleNullInputsForUDF rule") {
>   val a = testRelation.output(0)
>   val func = (x: Int, y: Int) => x + y
>   val udf1 = ScalaUDF(func, IntegerType, a :: a :: Nil, nullableTypes = false 
> :: false :: Nil)
>   val udf2 = ScalaUDF(func, IntegerType, a :: udf1 :: Nil, nullableTypes = 
> false :: false :: Nil)
>   val plan = Project(Alias(udf2, "")() :: Nil, testRelation)
>   comparePlans(plan.analyze, plan.analyze.analyze)
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25650) Make analyzer rules used in once-policy idempotent

2018-10-09 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-25650:

Description: 
Rules like {{HandleNullInputsForUDF}} 
(https://issues.apache.org/jira/browse/SPARK-24891) do not stabilize (can apply 
new changes to a plan indefinitely) and can cause problems like SQL cache 
mismatching.
 Ideally, all rules whether in a once-policy batch or a fixed-point-policy 
batch should stabilize after the number of runs specified. Once-policy should 
be considered a performance improvement, a assumption that the rule can 
stabilize after just one run rather than an assumption that the rule won't be 
applied more than once. Those once-policy rules should be able to run fine with 
fixed-point policy rule as well.
 Currently we already have a check for fixed-point and throws an exception if 
maximum number of runs is reached and the plan is still changing. Here, in this 
PR, a similar check is added for once-policy and throws an exception if the 
plan changes between the first run and the second run of a once-policy rule.

To reproduce this issue, go to [https://github.com/apache/spark/pull/22060], 
apply the changes and remove the specific rule from the whitelist 
https://github.com/apache/spark/pull/22060/files#diff-f70523b948b7af21abddfa3ab7e1d7d6R71.

  was:
Rules like {{HandleNullInputsForUDF}} 
(https://issues.apache.org/jira/browse/SPARK-24891) do not stabilize (can apply 
new changes to a plan indefinitely) and can cause problems like SQL cache 
mismatching.
 Ideally, all rules whether in a once-policy batch or a fixed-point-policy 
batch should stabilize after the number of runs specified. Once-policy should 
be considered a performance improvement, a assumption that the rule can 
stabilize after just one run rather than an assumption that the rule won't be 
applied more than once. Those once-policy rules should be able to run fine with 
fixed-point policy rule as well.
 Currently we already have a check for fixed-point and throws an exception if 
maximum number of runs is reached and the plan is still changing. Here, in this 
PR, a similar check is added for once-policy and throws an exception if the 
plan changes between the first run and the second run of a once-policy rule.

To reproduce this issue, go to [https://github.com/apache/spark/pull/22060] and 
apply the changes.


> Make analyzer rules used in once-policy idempotent
> --
>
> Key: SPARK-25650
> URL: https://issues.apache.org/jira/browse/SPARK-25650
> Project: Spark
>  Issue Type: Task
>  Components: SQL
>Affects Versions: 2.3.2
>Reporter: Maryann Xue
>Priority: Major
>
> Rules like {{HandleNullInputsForUDF}} 
> (https://issues.apache.org/jira/browse/SPARK-24891) do not stabilize (can 
> apply new changes to a plan indefinitely) and can cause problems like SQL 
> cache mismatching.
>  Ideally, all rules whether in a once-policy batch or a fixed-point-policy 
> batch should stabilize after the number of runs specified. Once-policy should 
> be considered a performance improvement, a assumption that the rule can 
> stabilize after just one run rather than an assumption that the rule won't be 
> applied more than once. Those once-policy rules should be able to run fine 
> with fixed-point policy rule as well.
>  Currently we already have a check for fixed-point and throws an exception if 
> maximum number of runs is reached and the plan is still changing. Here, in 
> this PR, a similar check is added for once-policy and throws an exception if 
> the plan changes between the first run and the second run of a once-policy 
> rule.
> To reproduce this issue, go to [https://github.com/apache/spark/pull/22060], 
> apply the changes and remove the specific rule from the whitelist 
> https://github.com/apache/spark/pull/22060/files#diff-f70523b948b7af21abddfa3ab7e1d7d6R71.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25691) Analyzer rule "AliasViewChild" does not stabilize

2018-10-09 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-25691:
---

 Summary: Analyzer rule "AliasViewChild" does not stabilize
 Key: SPARK-25691
 URL: https://issues.apache.org/jira/browse/SPARK-25691
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.4.0
Reporter: Maryann Xue


To reproduce the issue: 
https://github.com/apache/spark/pull/22060/files#diff-f70523b948b7af21abddfa3ab7e1d7d6R73.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25690) Analyzer rule "HandleNullInputsForUDF" does not stabilize and can be applied infinitely

2018-10-09 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-25690:

Issue Type: Sub-task  (was: Bug)
Parent: SPARK-25650

> Analyzer rule "HandleNullInputsForUDF" does not stabilize and can be applied 
> infinitely
> ---
>
> Key: SPARK-25690
> URL: https://issues.apache.org/jira/browse/SPARK-25690
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core, SQL
>Affects Versions: 2.4.0
>Reporter: Maryann Xue
>Assignee: Sean Owen
>Priority: Major
> Fix For: 2.4.0
>
>
> This was fixed in SPARK-24891 and was then broken by SPARK-25044.
> The tests added in SPARK-24891 were not good enough and the expected failures 
> were shadowed by SPARK-24865. For more details, please refer to SPARK-25650. 
> Code changes and tests in 
> [https://github.com/apache/spark/pull/22060/files#diff-f70523b948b7af21abddfa3ab7e1d7d6R72]
>  can help reproduce the issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25690) Analyzer rule "HandleNullInputsForUDF" does not stabilize and can be applied infinitely

2018-10-09 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-25690:

Issue Type: Bug  (was: Sub-task)
Parent: (was: SPARK-14220)

> Analyzer rule "HandleNullInputsForUDF" does not stabilize and can be applied 
> infinitely
> ---
>
> Key: SPARK-25690
> URL: https://issues.apache.org/jira/browse/SPARK-25690
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.4.0
>Reporter: Maryann Xue
>Assignee: Sean Owen
>Priority: Major
> Fix For: 2.4.0
>
>
> This was fixed in SPARK-24891 and was then broken by SPARK-25044.
> The tests added in SPARK-24891 were not good enough and the expected failures 
> were shadowed by SPARK-24865. For more details, please refer to SPARK-25650. 
> Code changes and tests in 
> [https://github.com/apache/spark/pull/22060/files#diff-f70523b948b7af21abddfa3ab7e1d7d6R72]
>  can help reproduce the issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25690) Analyzer rule "HandleNullInputsForUDF" does not stabilize and can be applied infinitely

2018-10-09 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-25690:

Description: 
This was fixed in SPARK-24891 and was then broken by SPARK-25044.

The tests added in SPARK-24891 were not good enough and the expected failures 
were shadowed by SPARK-24865. For more details, please refer to SPARK-25650. 
Code changes and tests in 
[https://github.com/apache/spark/pull/22060/files#diff-f70523b948b7af21abddfa3ab7e1d7d6R72]
 can help reproduce the issue.

  was:
A few SQL-related tests fail in Scala 2.12, such as UDFSuite's "SPARK-24891 Fix 
HandleNullInputsForUDF rule":
{code:java}
- SPARK-24891 Fix HandleNullInputsForUDF rule *** FAILED ***
Results do not match for query:
...
== Results ==

== Results ==
!== Correct Answer - 3 == == Spark Answer - 3 ==
!struct<> struct
![0,10,null] [0,10,0]
![1,12,null] [1,12,1]
![2,14,null] [2,14,2] (QueryTest.scala:163){code}
You can kind of get what's going on reading the test:
{code:java}
test("SPARK-24891 Fix HandleNullInputsForUDF rule") {
// assume(!ClosureCleanerSuite2.supportsLMFs)
// This test won't test what it intends to in 2.12, as lambda metafactory 
closures
// have arg types that are not primitive, but Object
val udf1 = udf({(x: Int, y: Int) => x + y})
val df = spark.range(0, 3).toDF("a")
.withColumn("b", udf1($"a", udf1($"a", lit(10
.withColumn("c", udf1($"a", lit(null)))
val plan = spark.sessionState.executePlan(df.logicalPlan).analyzed

comparePlans(df.logicalPlan, plan)
checkAnswer(
df,
Seq(
Row(0, 10, null),
Row(1, 12, null),
Row(2, 14, null)))
}{code}
 

It seems that the closure that is fed in as a UDF changes behavior, in a way 
that primitive-type arguments are handled differently. For example an Int 
argument, when fed 'null', acts like 0.

I'm sure it's a difference in the LMF closure and how its types are understood, 
but not exactly sure of the cause yet.


> Analyzer rule "HandleNullInputsForUDF" does not stabilize and can be applied 
> infinitely
> ---
>
> Key: SPARK-25690
> URL: https://issues.apache.org/jira/browse/SPARK-25690
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core, SQL
>Affects Versions: 2.4.0
>Reporter: Maryann Xue
>Assignee: Sean Owen
>Priority: Major
> Fix For: 2.4.0
>
>
> This was fixed in SPARK-24891 and was then broken by SPARK-25044.
> The tests added in SPARK-24891 were not good enough and the expected failures 
> were shadowed by SPARK-24865. For more details, please refer to SPARK-25650. 
> Code changes and tests in 
> [https://github.com/apache/spark/pull/22060/files#diff-f70523b948b7af21abddfa3ab7e1d7d6R72]
>  can help reproduce the issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25690) Analyzer rule "HandleNullInputsForUDF" does not stabilize and can be applied infinitely

2018-10-09 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-25690:
---

 Summary: Analyzer rule "HandleNullInputsForUDF" does not stabilize 
and can be applied infinitely
 Key: SPARK-25690
 URL: https://issues.apache.org/jira/browse/SPARK-25690
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core, SQL
Affects Versions: 2.4.0
Reporter: Maryann Xue
Assignee: Sean Owen
 Fix For: 2.4.0


A few SQL-related tests fail in Scala 2.12, such as UDFSuite's "SPARK-24891 Fix 
HandleNullInputsForUDF rule":
{code:java}
- SPARK-24891 Fix HandleNullInputsForUDF rule *** FAILED ***
Results do not match for query:
...
== Results ==

== Results ==
!== Correct Answer - 3 == == Spark Answer - 3 ==
!struct<> struct
![0,10,null] [0,10,0]
![1,12,null] [1,12,1]
![2,14,null] [2,14,2] (QueryTest.scala:163){code}
You can kind of get what's going on reading the test:
{code:java}
test("SPARK-24891 Fix HandleNullInputsForUDF rule") {
// assume(!ClosureCleanerSuite2.supportsLMFs)
// This test won't test what it intends to in 2.12, as lambda metafactory 
closures
// have arg types that are not primitive, but Object
val udf1 = udf({(x: Int, y: Int) => x + y})
val df = spark.range(0, 3).toDF("a")
.withColumn("b", udf1($"a", udf1($"a", lit(10
.withColumn("c", udf1($"a", lit(null)))
val plan = spark.sessionState.executePlan(df.logicalPlan).analyzed

comparePlans(df.logicalPlan, plan)
checkAnswer(
df,
Seq(
Row(0, 10, null),
Row(1, 12, null),
Row(2, 14, null)))
}{code}
 

It seems that the closure that is fed in as a UDF changes behavior, in a way 
that primitive-type arguments are handled differently. For example an Int 
argument, when fed 'null', acts like 0.

I'm sure it's a difference in the LMF closure and how its types are understood, 
but not exactly sure of the cause yet.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25650) Make analyzer rules used in once-policy idempotent

2018-10-05 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-25650:

Issue Type: Task  (was: Bug)

> Make analyzer rules used in once-policy idempotent
> --
>
> Key: SPARK-25650
> URL: https://issues.apache.org/jira/browse/SPARK-25650
> Project: Spark
>  Issue Type: Task
>  Components: SQL
>Affects Versions: 2.3.2
>Reporter: Maryann Xue
>Priority: Major
>
> Rules like {{HandleNullInputsForUDF}} 
> (https://issues.apache.org/jira/browse/SPARK-24891) do not stabilize (can 
> apply new changes to a plan indefinitely) and can cause problems like SQL 
> cache mismatching.
> Ideally, all rules whether in a once-policy batch or a fixed-point-policy 
> batch should stabilize after the number of runs specified. Once-policy should 
> be considered a performance improvement, a assumption that the rule can 
> stabilize after just one run rather than an assumption that the rule won't be 
> applied more than once. Those once-policy rules should be able to run fine 
> with fixed-point policy rule as well.
> Currently we already have a check for fixed-point and throws an exception if 
> maximum number of runs is reached and the plan is still changing. Here, in 
> this PR, a similar check is added for once-policy and throws an exception if 
> the plan changes between the first run and the second run of a once-policy 
> rule.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25650) Make analyzer rules used in once-policy idempotent

2018-10-05 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-25650:

Description: 
Rules like {{HandleNullInputsForUDF}} 
(https://issues.apache.org/jira/browse/SPARK-24891) do not stabilize (can apply 
new changes to a plan indefinitely) and can cause problems like SQL cache 
mismatching.
 Ideally, all rules whether in a once-policy batch or a fixed-point-policy 
batch should stabilize after the number of runs specified. Once-policy should 
be considered a performance improvement, a assumption that the rule can 
stabilize after just one run rather than an assumption that the rule won't be 
applied more than once. Those once-policy rules should be able to run fine with 
fixed-point policy rule as well.
 Currently we already have a check for fixed-point and throws an exception if 
maximum number of runs is reached and the plan is still changing. Here, in this 
PR, a similar check is added for once-policy and throws an exception if the 
plan changes between the first run and the second run of a once-policy rule.

To reproduce this issue, go to [https://github.com/apache/spark/pull/22060] and 
apply the changes.

  was:
Rules like {{HandleNullInputsForUDF}} 
(https://issues.apache.org/jira/browse/SPARK-24891) do not stabilize (can apply 
new changes to a plan indefinitely) and can cause problems like SQL cache 
mismatching.
Ideally, all rules whether in a once-policy batch or a fixed-point-policy batch 
should stabilize after the number of runs specified. Once-policy should be 
considered a performance improvement, a assumption that the rule can stabilize 
after just one run rather than an assumption that the rule won't be applied 
more than once. Those once-policy rules should be able to run fine with 
fixed-point policy rule as well.
Currently we already have a check for fixed-point and throws an exception if 
maximum number of runs is reached and the plan is still changing. Here, in this 
PR, a similar check is added for once-policy and throws an exception if the 
plan changes between the first run and the second run of a once-policy rule.


> Make analyzer rules used in once-policy idempotent
> --
>
> Key: SPARK-25650
> URL: https://issues.apache.org/jira/browse/SPARK-25650
> Project: Spark
>  Issue Type: Task
>  Components: SQL
>Affects Versions: 2.3.2
>Reporter: Maryann Xue
>Priority: Major
>
> Rules like {{HandleNullInputsForUDF}} 
> (https://issues.apache.org/jira/browse/SPARK-24891) do not stabilize (can 
> apply new changes to a plan indefinitely) and can cause problems like SQL 
> cache mismatching.
>  Ideally, all rules whether in a once-policy batch or a fixed-point-policy 
> batch should stabilize after the number of runs specified. Once-policy should 
> be considered a performance improvement, a assumption that the rule can 
> stabilize after just one run rather than an assumption that the rule won't be 
> applied more than once. Those once-policy rules should be able to run fine 
> with fixed-point policy rule as well.
>  Currently we already have a check for fixed-point and throws an exception if 
> maximum number of runs is reached and the plan is still changing. Here, in 
> this PR, a similar check is added for once-policy and throws an exception if 
> the plan changes between the first run and the second run of a once-policy 
> rule.
> To reproduce this issue, go to [https://github.com/apache/spark/pull/22060] 
> and apply the changes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25650) Make analyzer rules used in once-policy idempotent

2018-10-05 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-25650:
---

 Summary: Make analyzer rules used in once-policy idempotent
 Key: SPARK-25650
 URL: https://issues.apache.org/jira/browse/SPARK-25650
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.2
Reporter: Maryann Xue


Rules like {{HandleNullInputsForUDF}} 
(https://issues.apache.org/jira/browse/SPARK-24891) do not stabilize (can apply 
new changes to a plan indefinitely) and can cause problems like SQL cache 
mismatching.
Ideally, all rules whether in a once-policy batch or a fixed-point-policy batch 
should stabilize after the number of runs specified. Once-policy should be 
considered a performance improvement, a assumption that the rule can stabilize 
after just one run rather than an assumption that the rule won't be applied 
more than once. Those once-policy rules should be able to run fine with 
fixed-point policy rule as well.
Currently we already have a check for fixed-point and throws an exception if 
maximum number of runs is reached and the plan is still changing. Here, in this 
PR, a similar check is added for once-policy and throws an exception if the 
plan changes between the first run and the second run of a once-policy rule.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25505) The output order of grouping columns in Pivot is different from the input order

2018-09-21 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-25505:
---

 Summary: The output order of grouping columns in Pivot is 
different from the input order
 Key: SPARK-25505
 URL: https://issues.apache.org/jira/browse/SPARK-25505
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.0
Reporter: Maryann Xue


For example,
{code}
SELECT * FROM (
  SELECT course, earnings, "a" as a, "z" as z, "b" as b, "y" as y, "c" as c, 
"x" as x, "d" as d, "w" as w
  FROM courseSales
)
PIVOT (
  sum(earnings)
  FOR course IN ('dotNET', 'Java')
)
{code}
The output columns should be "a, z, b, y, c, x, d, w, ..." but now it is "a, b, 
c, d, w, x, y, z, ..."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25450) PushProjectThroughUnion rule uses the same exprId for project expressions in each Union child, causing mistakes in constant propagation

2018-09-17 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-25450:

Issue Type: Bug  (was: Improvement)

> PushProjectThroughUnion rule uses the same exprId for project expressions in 
> each Union child, causing mistakes in constant propagation
> ---
>
> Key: SPARK-25450
> URL: https://issues.apache.org/jira/browse/SPARK-25450
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Maryann Xue
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25450) PushProjectThroughUnion rule uses the same exprId for project expressions in each Union child, causing mistakes in constant propagation

2018-09-17 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-25450:
---

 Summary: PushProjectThroughUnion rule uses the same exprId for 
project expressions in each Union child, causing mistakes in constant 
propagation
 Key: SPARK-25450
 URL: https://issues.apache.org/jira/browse/SPARK-25450
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.1
Reporter: Maryann Xue






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25423) Output "dataFilters" in DataSourceScanExec.metadata

2018-09-13 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-25423:

Summary: Output "dataFilters" in DataSourceScanExec.metadata  (was: Output 
"dataFilters" in DataSourceScanExec.toString)

> Output "dataFilters" in DataSourceScanExec.metadata
> ---
>
> Key: SPARK-25423
> URL: https://issues.apache.org/jira/browse/SPARK-25423
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Maryann Xue
>Priority: Trivial
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25423) Output "dataFilters" in DataSourceScanExec.toString

2018-09-13 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-25423:
---

 Summary: Output "dataFilters" in DataSourceScanExec.toString
 Key: SPARK-25423
 URL: https://issues.apache.org/jira/browse/SPARK-25423
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.1
Reporter: Maryann Xue






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25415) Make plan change log in RuleExecutor configurable by SQLConf

2018-09-12 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-25415:
---

 Summary: Make plan change log in RuleExecutor configurable by 
SQLConf
 Key: SPARK-25415
 URL: https://issues.apache.org/jira/browse/SPARK-25415
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.1
Reporter: Maryann Xue


In RuleExecutor, after applying a rule, if the plan has changed, the before and 
after plan will be logged using level "trace". At times, however, such 
information can be very helpful for debugging, so making the log level 
configurable in SQLConf would allow users to turn on the plan change log 
independently and save the trouble of tweaking log4j settings.
Meanwhile, filtering plan change log for specific rules can also be very useful.
So I propose adding two confs:
1. spark.sql.optimizer.planChangeLog.level - set a specific log level for 
logging plan changes after a rule is applied.
2. spark.sql.optimizer.planChangeLog.rules - enable plan change logging only 
for a set of specified rules, separated by commas.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25063) Rename class KnowNotNull to KnownNotNull

2018-08-08 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-25063:

Description: It's a class name typo checked in through SPARK-24891  (was: 
It's a class name typo check-in through SPARK-24891)

> Rename class KnowNotNull to KnownNotNull
> 
>
> Key: SPARK-25063
> URL: https://issues.apache.org/jira/browse/SPARK-25063
> Project: Spark
>  Issue Type: Task
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Maryann Xue
>Priority: Trivial
>
> It's a class name typo checked in through SPARK-24891



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25063) Rename class KnowNotNull to KnownNotNull

2018-08-08 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-25063:
---

 Summary: Rename class KnowNotNull to KnownNotNull
 Key: SPARK-25063
 URL: https://issues.apache.org/jira/browse/SPARK-25063
 Project: Spark
  Issue Type: Task
  Components: SQL
Affects Versions: 2.4.0
Reporter: Maryann Xue


It's a class name typo check-in through SPARK-24891



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24972) PivotFirst could not handle pivot columns of complex types

2018-07-30 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-24972:
---

 Summary: PivotFirst could not handle pivot columns of complex types
 Key: SPARK-24972
 URL: https://issues.apache.org/jira/browse/SPARK-24972
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.1
Reporter: Maryann Xue


{{PivotFirst}} did not handle complex types for pivot columns properly. And as 
a result, the pivot column could not be matched with any pivot value and it 
always returned empty result.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24891) Fix HandleNullInputsForUDF rule

2018-07-23 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-24891:

Summary: Fix HandleNullInputsForUDF rule  (was: Fix HandleNullInputsForUDF 
rule.)

> Fix HandleNullInputsForUDF rule
> ---
>
> Key: SPARK-24891
> URL: https://issues.apache.org/jira/browse/SPARK-24891
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Maryann Xue
>Priority: Major
>
> The HandleNullInputsForUDF rule can generate new {{If}} node infinitely, thus 
> causing problems like match of SQL cache missed. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24891) Fix HandleNullInputsForUDF rule.

2018-07-23 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-24891:
---

 Summary: Fix HandleNullInputsForUDF rule.
 Key: SPARK-24891
 URL: https://issues.apache.org/jira/browse/SPARK-24891
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.1
Reporter: Maryann Xue


The HandleNullInputsForUDF rule can generate new {{If}} node infinitely, thus 
causing problems like match of SQL cache missed. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24802) Optimization Rule Exclusion

2018-07-13 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-24802:

Description: 
Since Spark has provided fairly clear interfaces for adding user-defined 
optimization rules, it would be nice to have an easy-to-use interface for 
excluding an optimization rule from the Spark query optimizer as well.

This would make customizing Spark optimizer easier and sometimes could 
debugging issues too.
 # Add a new config {{spark.sql.optimizer.excludedRules}}, with the value being 
a list of rule names separated by comma.
 # Modify the current {{batches}} method to remove the excluded rules from the 
default batches. Log the rules that have been excluded.
 # Split the existing default batches into "post-analysis batches" and 
"optimization batches" so that only rules in the "optimization batches" can be 
excluded.

  was:
Since Spark has provided fairly clear interfaces for adding user-defined 
optimization rules, it would be nice to have an easy-to-use interface for 
excluding an optimization rule from the Spark query optimizer as well.

This would make customizing Spark optimizer easier and sometimes could 
debugging issues too.
 # Add a new config {{spark.sql.optimizer.excludedRules}}, with the value being 
a list of rule names separated by comma.
 # Modify the current {{batches}} method to remove the excluded rules from the 
default batches. Log the rules that have been excluded.
 # Split the existing default batches into "post-analysis batches" and 
"optimization batches" so that only rules in the "optimization batches" can be 
excluded.

 


> Optimization Rule Exclusion
> ---
>
> Key: SPARK-24802
> URL: https://issues.apache.org/jira/browse/SPARK-24802
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Maryann Xue
>Priority: Major
>
> Since Spark has provided fairly clear interfaces for adding user-defined 
> optimization rules, it would be nice to have an easy-to-use interface for 
> excluding an optimization rule from the Spark query optimizer as well.
> This would make customizing Spark optimizer easier and sometimes could 
> debugging issues too.
>  # Add a new config {{spark.sql.optimizer.excludedRules}}, with the value 
> being a list of rule names separated by comma.
>  # Modify the current {{batches}} method to remove the excluded rules from 
> the default batches. Log the rules that have been excluded.
>  # Split the existing default batches into "post-analysis batches" and 
> "optimization batches" so that only rules in the "optimization batches" can 
> be excluded.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24802) Optimization Rule Exclusion

2018-07-13 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-24802:
---

 Summary: Optimization Rule Exclusion
 Key: SPARK-24802
 URL: https://issues.apache.org/jira/browse/SPARK-24802
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 2.3.0
Reporter: Maryann Xue


Since Spark has provided fairly clear interfaces for adding user-defined 
optimization rules, it would be nice to have an easy-to-use interface for 
excluding an optimization rule from the Spark query optimizer as well.

This would make customizing Spark optimizer easier and sometimes could 
debugging issues too.
 # Add a new config {{spark.sql.optimizer.excludedRules}}, with the value being 
a list of rule names separated by comma.
 # Modify the current {{batches}} method to remove the excluded rules from the 
default batches. Log the rules that have been excluded.
 # Split the existing default batches into "post-analysis batches" and 
"optimization batches" so that only rules in the "optimization batches" can be 
excluded.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24790) Allow complex aggregate expressions in Pivot

2018-07-11 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-24790:
---

 Summary: Allow complex aggregate expressions in Pivot
 Key: SPARK-24790
 URL: https://issues.apache.org/jira/browse/SPARK-24790
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Maryann Xue


In SPARK-24035, to conform with Oracle PIVOT syntax, a strict check of PIVOT's 
aggregate expressions was exerted, which allows only an AggregateExpression 
with or without alias. Now we would like to relax to complex aggregate 
expressions, like {{ceil(sum(col1))}} or {{sum(col1) + 1}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24790) Allow complex aggregate expressions in Pivot

2018-07-11 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-24790:

Description: In SPARK-24035, to conform with Oracle PIVOT syntax, a strict 
check of PIVOT's aggregate expressions was exerted, which allows only an 
AggregateExpression with or without alias. Now we would like to relax the check 
to allow complex aggregate expressions, like {{ceil(sum(col1))}} or {{sum(col1) 
+ 1}}  (was: In SPARK-24035, to conform with Oracle PIVOT syntax, a strict 
check of PIVOT's aggregate expressions was exerted, which allows only an 
AggregateExpression with or without alias. Now we would like to relax to 
complex aggregate expressions, like {{ceil(sum(col1))}} or {{sum(col1) + 1}})

> Allow complex aggregate expressions in Pivot
> 
>
> Key: SPARK-24790
> URL: https://issues.apache.org/jira/browse/SPARK-24790
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Maryann Xue
>Priority: Minor
>
> In SPARK-24035, to conform with Oracle PIVOT syntax, a strict check of 
> PIVOT's aggregate expressions was exerted, which allows only an 
> AggregateExpression with or without alias. Now we would like to relax the 
> check to allow complex aggregate expressions, like {{ceil(sum(col1))}} or 
> {{sum(col1) + 1}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24696) ColumnPruning rule fails to remove extra Project

2018-06-29 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-24696:

Description: 
The following test case would cause a "max iterations reached" error, i.e., an 
infinite loop in the optimizer.
{code}
   test("optimization infinite loop") {
withTempDir { dir =>
  val path = dir.getCanonicalPath
  import testImplicits._
  Seq((3, "a")).toDF("id", 
"value").write.format("parquet").mode("overwrite").save(path)
  import org.apache.spark.sql.functions.udf
  val filterIt = udf((value: Int) => value > 0).asNondeterministic
  spark.read.load(path)
.where(filterIt('id))
.where('id < 0)
.select('id)
.collect
}
  }
{code}
Error message:
{code}
Max iterations (100) reached for batch Operator Optimization before Inferring 
Filters, tree:
Filter (id#11 < 0)
+- Project [id#11]
   +- Filter if (isnull(id#11)) null else UDF(id#11)
  +- Relation[id#11,value#12] parquet

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Max iterations 
(100) reached for batch Operator Optimization before Inferring Filters, tree:
Filter (id#11 < 0)
+- Project [id#11]
   +- Filter if (isnull(id#11)) null else UDF(id#11)
  +- Relation[id#11,value#12] parquet

at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:117)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
at 
org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at 
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3286)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2745)
at 
org.apache.spark.sql.DataFrameSuite$$anonfun$94$$anonfun$apply$mcV$sp$199.apply(DataFrameSuite.scala:2343)
at 
org.apache.spark.sql.DataFrameSuite$$anonfun$94$$anonfun$apply$mcV$sp$199.apply(DataFrameSuite.scala:2333)
at 
org.apache.spark.sql.test.SQLTestUtilsBase$class.withTempDir(SQLTestUtils.scala:215)
at 
org.apache.spark.sql.DataFrameSuite.withTempDir(DataFrameSuite.scala:43)
at 
org.apache.spark.sql.DataFrameSuite$$anonfun$94.apply$mcV$sp(DataFrameSuite.scala:2333)
at 
org.apache.spark.sql.DataFrameSuite$$anonfun$94.apply(DataFrameSuite.scala:2333)
at 
org.apache.spark.sql.DataFrameSuite$$anonfun$94.apply(DataFrameSuite.scala:2333)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:103)
at 
org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
at 
org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
at 
org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
at 
org.apache.spark.sql.DataFrameSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(DataFrameSuite.scala:43)
at 
org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
at org.apache.spark.sql.DataFrameSuite.runTest(DataFrameSuite.scala:43)
at 
org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
at 
org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
at 
org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
at 

[jira] [Updated] (SPARK-24596) Non-cascading Cache Invalidation

2018-06-21 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-24596:

Description: 
When invalidating a cache, we invalid other caches dependent on this cache to 
ensure cached data is up to date. For example, when the underlying table has 
been modified or the table has been dropped itself, all caches that use this 
table should be invalidated or refreshed.

However, in other cases, like when user simply want to drop a cache to free up 
memory, we do not need to invalidate dependent caches since no underlying data 
has been changed. For this reason, we would like to introduce a new cache 
invalidation mode: the non-cascading cache invalidation. And we choose between 
the existing mode and the new mode for different cache invalidation scenarios:
 # Drop tables and regular (persistent) views: regular mode
 # Drop temporary views: non-cascading mode
 # Modify table contents (INSERT/UPDATE/MERGE/DELETE): regular mode
 # Call {{DataSet.unpersist()}}: non-cascading mode
 # Call {{Catalog.uncacheTable()}}: follow the same convention as drop 
tables/view, which is, use non-cascading mode for temporary views and regular 
mode for the rest

Note that a regular (persistent) view is a database object just like a table, 
so after dropping a regular view (whether cached or not cached), any query 
referring to that view should no long be valid. Hence if a cached persistent 
view is dropped, we need to invalidate the all dependent caches so that 
exceptions will be thrown for any later reference. On the other hand, a 
temporary view is in fact equivalent to an unnamed DataSet, and dropping a 
temporary view should have no impact on queries referencing that view. Thus we 
should do non-cascading uncaching for temporary views, which also guarantees a 
consistent uncaching behavior between temporary views and unnamed DataSets.

  was:
When invalidating a cache, we invalid other caches dependent on this cache to 
ensure cached data is up to date. For example, when the underlying table has 
been modified or the table has been dropped itself, all caches that use this 
table should be invalidated or refreshed.

However, in other cases, like when user simply want to drop a cache to free up 
memory, we do not need to invalidate dependent caches since no underlying data 
has been changed. For this reason, we would like to introduce a new cache 
invalidation mode: the non-cascading cache invalidation. And we choose between 
the existing mode and the new mode for different cache invalidation scenarios:
 # Drop tables and regular (persistent) views: regular mode
 # Drop temporary views: non-cascading mode
 # Modify table contents (INSERT/UPDATE/MERGE/DELETE): regular mode
 # Call DataSet.unpersist(): non-cascading mode

Note that a regular (persistent) view is a database object just like a table, 
so after dropping a regular view (whether cached or not cached), any query 
referring to that view should no long be valid. Hence if a cached persistent 
view is dropped, we need to invalidate the all dependent caches so that 
exceptions will be thrown for any later reference. On the other hand, a 
temporary view is in fact equivalent to an unnamed DataSet, and dropping a 
temporary view should have no impact on queries referencing that view. Thus we 
should do non-cascading uncaching for temporary views, which also guarantees a 
consistent uncaching behavior between temporary views and unnamed DataSets.


> Non-cascading Cache Invalidation
> 
>
> Key: SPARK-24596
> URL: https://issues.apache.org/jira/browse/SPARK-24596
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Maryann Xue
>Priority: Major
> Fix For: 2.4.0
>
>
> When invalidating a cache, we invalid other caches dependent on this cache to 
> ensure cached data is up to date. For example, when the underlying table has 
> been modified or the table has been dropped itself, all caches that use this 
> table should be invalidated or refreshed.
> However, in other cases, like when user simply want to drop a cache to free 
> up memory, we do not need to invalidate dependent caches since no underlying 
> data has been changed. For this reason, we would like to introduce a new 
> cache invalidation mode: the non-cascading cache invalidation. And we choose 
> between the existing mode and the new mode for different cache invalidation 
> scenarios:
>  # Drop tables and regular (persistent) views: regular mode
>  # Drop temporary views: non-cascading mode
>  # Modify table contents (INSERT/UPDATE/MERGE/DELETE): regular mode
>  # Call {{DataSet.unpersist()}}: non-cascading mode
>  # Call {{Catalog.uncacheTable()}}: follow the same convention as drop 
> tables/view, which is, use 

[jira] [Updated] (SPARK-24613) Cache with UDF could not be matched with subsequent dependent caches

2018-06-20 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-24613:

Description: 
When caching a query, we generate its execution plan from the query's logical 
plan. However, the logical plan we get from the Dataset has already been 
analyzed, and when we try the get the execution plan, this already analyzed 
logical plan will be analyzed again in the new QueryExecution object, and 
unfortunately some rules have side effects if applied multiple times, which in 
this case, is the {{HandleNullInputsForUDF}} rule. The re-analyzed plan now has 
an extra null-check and can't be matched against the same plan. The following 
test would fail since {{df2}}'s execution plan inside the CacheManager does not 
depend on {{df1}}.
{code:java}
test("cache UDF result correctly 2") {
  val expensiveUDF = udf({x: Int => Thread.sleep(1); x})
  val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a"))
  val df2 = df.agg(sum(df("b")))

  df.cache()
  df.count()
  df2.cache()

  // udf has been evaluated during caching, and thus should not be re-evaluated 
here
  failAfter(5 seconds) {
df2.collect()
  }
}
{code}

While it might be worth re-visiting such analysis rules, we can make also fix 
the CacheManager to avoid these potential problems.

  was:
When caching a query, we generate its execution plan from the query's logical 
plan. However, the logical plan we get from the Dataset has already been 
analyzed, and when we try the get the execution plan, this already analyzed 
logical plan will be analyzed again in the new QueryExecution object, and 
unfortunately some rules have side effects if applied multiple times, which in 
this case, is the {{HandleNullInputsForUDF}} rule. The re-analyzed plan now has 
an extra null-check and can't be matched against the same plan. The following 
test would fail since {{df2}}'s execution plan inside the CacheManager does not 
depend on {{df1}}.
{code:java}
test("cache UDF result correctly 2") {
  val expensiveUDF = udf({x: Int => Thread.sleep(1); x})
  val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a"))
  val df2 = df.agg(sum(df("b")))

  df.cache()
  df.count()
  df2.cache()

  // udf has been evaluated during caching, and thus should not be re-evaluated 
here
  failAfter(5 seconds) {
df2.collect()
  }
}
{code}


> Cache with UDF could not be matched with subsequent dependent caches
> 
>
> Key: SPARK-24613
> URL: https://issues.apache.org/jira/browse/SPARK-24613
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Maryann Xue
>Priority: Minor
> Fix For: 2.4.0
>
>
> When caching a query, we generate its execution plan from the query's logical 
> plan. However, the logical plan we get from the Dataset has already been 
> analyzed, and when we try the get the execution plan, this already analyzed 
> logical plan will be analyzed again in the new QueryExecution object, and 
> unfortunately some rules have side effects if applied multiple times, which 
> in this case, is the {{HandleNullInputsForUDF}} rule. The re-analyzed plan 
> now has an extra null-check and can't be matched against the same plan. The 
> following test would fail since {{df2}}'s execution plan inside the 
> CacheManager does not depend on {{df1}}.
> {code:java}
> test("cache UDF result correctly 2") {
>   val expensiveUDF = udf({x: Int => Thread.sleep(1); x})
>   val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a"))
>   val df2 = df.agg(sum(df("b")))
>   df.cache()
>   df.count()
>   df2.cache()
>   // udf has been evaluated during caching, and thus should not be 
> re-evaluated here
>   failAfter(5 seconds) {
> df2.collect()
>   }
> }
> {code}
> While it might be worth re-visiting such analysis rules, we can make also fix 
> the CacheManager to avoid these potential problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24613) Cache with UDF could not be matched with subsequent dependent caches

2018-06-20 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-24613:
---

 Summary: Cache with UDF could not be matched with subsequent 
dependent caches
 Key: SPARK-24613
 URL: https://issues.apache.org/jira/browse/SPARK-24613
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: Maryann Xue
 Fix For: 2.4.0


When caching a query, we generate its execution plan from the query's logical 
plan. However, the logical plan we get from the Dataset has already been 
analyzed, and when we try the get the execution plan, this already analyzed 
logical plan will be analyzed again in the new QueryExecution object, and 
unfortunately some rules have side effects if applied multiple times, which in 
this case, is the {{HandleNullInputsForUDF}} rule. The re-analyzed plan now has 
an extra null-check and can't be matched against the same plan. The following 
test would fail since {{df2}}'s execution plan inside the CacheManager does not 
depend on {{df1}}.
{code:java}
test("cache UDF result correctly 2") {
  val expensiveUDF = udf({x: Int => Thread.sleep(1); x})
  val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a"))
  val df2 = df.agg(sum(df("b")))

  df.cache()
  df.count()
  df2.cache()

  // udf has been evaluated during caching, and thus should not be re-evaluated 
here
  failAfter(5 seconds) {
df2.collect()
  }
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24613) Cache with UDF could not be matched with subsequent dependent caches

2018-06-20 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-24613:

Priority: Minor  (was: Major)

> Cache with UDF could not be matched with subsequent dependent caches
> 
>
> Key: SPARK-24613
> URL: https://issues.apache.org/jira/browse/SPARK-24613
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Maryann Xue
>Priority: Minor
> Fix For: 2.4.0
>
>
> When caching a query, we generate its execution plan from the query's logical 
> plan. However, the logical plan we get from the Dataset has already been 
> analyzed, and when we try the get the execution plan, this already analyzed 
> logical plan will be analyzed again in the new QueryExecution object, and 
> unfortunately some rules have side effects if applied multiple times, which 
> in this case, is the {{HandleNullInputsForUDF}} rule. The re-analyzed plan 
> now has an extra null-check and can't be matched against the same plan. The 
> following test would fail since {{df2}}'s execution plan inside the 
> CacheManager does not depend on {{df1}}.
> {code:java}
> test("cache UDF result correctly 2") {
>   val expensiveUDF = udf({x: Int => Thread.sleep(1); x})
>   val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a"))
>   val df2 = df.agg(sum(df("b")))
>   df.cache()
>   df.count()
>   df2.cache()
>   // udf has been evaluated during caching, and thus should not be 
> re-evaluated here
>   failAfter(5 seconds) {
> df2.collect()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24613) Cache with UDF could not be matched with subsequent dependent caches

2018-06-20 Thread Maryann Xue (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-24613:

Issue Type: Bug  (was: Improvement)

> Cache with UDF could not be matched with subsequent dependent caches
> 
>
> Key: SPARK-24613
> URL: https://issues.apache.org/jira/browse/SPARK-24613
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Maryann Xue
>Priority: Major
> Fix For: 2.4.0
>
>
> When caching a query, we generate its execution plan from the query's logical 
> plan. However, the logical plan we get from the Dataset has already been 
> analyzed, and when we try the get the execution plan, this already analyzed 
> logical plan will be analyzed again in the new QueryExecution object, and 
> unfortunately some rules have side effects if applied multiple times, which 
> in this case, is the {{HandleNullInputsForUDF}} rule. The re-analyzed plan 
> now has an extra null-check and can't be matched against the same plan. The 
> following test would fail since {{df2}}'s execution plan inside the 
> CacheManager does not depend on {{df1}}.
> {code:java}
> test("cache UDF result correctly 2") {
>   val expensiveUDF = udf({x: Int => Thread.sleep(1); x})
>   val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a"))
>   val df2 = df.agg(sum(df("b")))
>   df.cache()
>   df.count()
>   df2.cache()
>   // udf has been evaluated during caching, and thus should not be 
> re-evaluated here
>   failAfter(5 seconds) {
> df2.collect()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24596) Non-cascading Cache Invalidation

2018-06-19 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-24596:
---

 Summary: Non-cascading Cache Invalidation
 Key: SPARK-24596
 URL: https://issues.apache.org/jira/browse/SPARK-24596
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: Maryann Xue
 Fix For: 2.4.0


When invalidating a cache, we invalid other caches dependent on this cache to 
ensure cached data is up to date. For example, when the underlying table has 
been modified or the table has been dropped itself, all caches that use this 
table should be invalidated or refreshed.

However, in other cases, like when user simply want to drop a cache to free up 
memory, we do not need to invalidate dependent caches since no underlying data 
has been changed. For this reason, we would like to introduce a new cache 
invalidation mode: the non-cascading cache invalidation. And we choose between 
the existing mode and the new mode for different cache invalidation scenarios:
 # Drop tables and regular (persistent) views: regular mode
 # Drop temporary views: non-cascading mode
 # Modify table contents (INSERT/UPDATE/MERGE/DELETE): regular mode
 # Call DataSet.unpersist(): non-cascading mode

Note that a regular (persistent) view is a database object just like a table, 
so after dropping a regular view (whether cached or not cached), any query 
referring to that view should no long be valid. Hence if a cached persistent 
view is dropped, we need to invalidate the all dependent caches so that 
exceptions will be thrown for any later reference. On the other hand, a 
temporary view is in fact equivalent to an unnamed DataSet, and dropping a 
temporary view should have no impact on queries referencing that view. Thus we 
should do non-cascading uncaching for temporary views, which also guarantees a 
consistent uncaching behavior between temporary views and unnamed DataSets.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24583) Wrong schema type in InsertIntoDataSourceCommand

2018-06-18 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-24583:
---

 Summary: Wrong schema type in InsertIntoDataSourceCommand
 Key: SPARK-24583
 URL: https://issues.apache.org/jira/browse/SPARK-24583
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.0
Reporter: Maryann Xue
 Fix For: 2.4.0


For a DataSource table, whose schema contains a field with "nullable=false", 
while user tries to insert a NULL value into this field, the input dataFrame 
will return an incorrect value or throw NullPointerException. And that's 
because, the schema nullability of the input relation has been overridden 
bluntly with the destination schema by the code below in 
{{InsertIntoDataSourceCommand}}:
{code:java}
  override def run(sparkSession: SparkSession): Seq[Row] = {
val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
val data = Dataset.ofRows(sparkSession, query)
// Apply the schema of the existing table to the new data.
val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, 
logicalRelation.schema)
relation.insert(df, overwrite)

// Re-cache all cached plans(including this relation itself, if it's 
cached) that refer to this
// data source relation.
sparkSession.sharedState.cacheManager.recacheByPlan(sparkSession, 
logicalRelation)

Seq.empty[Row]
  }
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24568) Code refactoring for DataType equalsXXX methods

2018-06-15 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-24568:
---

 Summary: Code refactoring for DataType equalsXXX methods
 Key: SPARK-24568
 URL: https://issues.apache.org/jira/browse/SPARK-24568
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: Maryann Xue
 Fix For: 2.4.0


Right now there is a lot of code duplication between all DataType equalsXXX 
methods: {{equalsIgnoreNullability}}, {{equalsIgnoreCaseAndNullability}}, 
{{equalsIgnoreCaseAndNullability}}, {{equalsStructurally}}. We can replace the 
dup code with a helper function.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24288) Enable preventing predicate pushdown

2018-05-18 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480237#comment-16480237
 ] 

Maryann Xue commented on SPARK-24288:
-

Thank you for pointing this out, [~cloud_fan]! I made {{OptimizerBarrier}} 
inherit from {{UnaryNode}} as a proof of concept that can quickly pass the 
basic tests, but it was not the optimal solution. I've just created a PR, so 
you guys can all take a look.

> Enable preventing predicate pushdown
> 
>
> Key: SPARK-24288
> URL: https://issues.apache.org/jira/browse/SPARK-24288
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Tomasz Gawęda
>Priority: Major
> Attachments: SPARK-24288.simple.patch
>
>
> Issue discussed on Mailing List: 
> [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html]
> While working with JDBC datasource I saw that many "or" clauses with 
> non-equality operators causes huge performance degradation of SQL query 
> to database (DB2). For example: 
> val df = spark.read.format("jdbc").(other options to parallelize 
> load).load() 
> df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x 
>  > 100)").show() // in real application whose predicates were pushed 
> many many lines below, many ANDs and ORs 
> If I use cache() before where, there is no predicate pushdown of this 
> "where" clause. However, in production system caching many sources is a 
> waste of memory (especially is pipeline is long and I must do cache many 
> times).There are also few more workarounds, but it would be great if Spark 
> will support preventing predicate pushdown by user.
>  
> For example: df.withAnalysisBarrier().where(...) ?
>  
> Note, that this should not be a global configuration option. If I read 2 
> DataFrames, df1 and df2, I would like to specify that df1 should not have 
> some predicates pushed down, but some may be, but df2 should have all 
> predicates pushed down, even if target query joins df1 and df2. As far as I 
> understand Spark optimizer, if we use functions like `withAnalysisBarrier` 
> and put AnalysisBarrier explicitly in logical plan, then predicates won't be 
> pushed down on this particular DataFrames and PP will be still possible on 
> the second one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24288) Enable preventing predicate pushdown

2018-05-16 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16478534#comment-16478534
 ] 

Maryann Xue commented on SPARK-24288:
-

Yes, sure. This is more of a demonstration than a patch. So I'll clean up and 
add more tests and put together a PR if we decide to go down this path.

In general I think we should not break the barrier as long as things can work 
out correctly. It doesn't have to optimal any more since it's the user's 
intention to force a barrier. So do you think self-join resolving would work 
alright without breaking the barrier, or it would be a correctness problem? 
Could you please point to me a test case?

> Enable preventing predicate pushdown
> 
>
> Key: SPARK-24288
> URL: https://issues.apache.org/jira/browse/SPARK-24288
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Tomasz Gawęda
>Priority: Major
> Attachments: SPARK-24288.simple.patch
>
>
> Issue discussed on Mailing List: 
> [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html]
> While working with JDBC datasource I saw that many "or" clauses with 
> non-equality operators causes huge performance degradation of SQL query 
> to database (DB2). For example: 
> val df = spark.read.format("jdbc").(other options to parallelize 
> load).load() 
> df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x 
>  > 100)").show() // in real application whose predicates were pushed 
> many many lines below, many ANDs and ORs 
> If I use cache() before where, there is no predicate pushdown of this 
> "where" clause. However, in production system caching many sources is a 
> waste of memory (especially is pipeline is long and I must do cache many 
> times).There are also few more workarounds, but it would be great if Spark 
> will support preventing predicate pushdown by user.
>  
> For example: df.withAnalysisBarrier().where(...) ?
>  
> Note, that this should not be a global configuration option. If I read 2 
> DataFrames, df1 and df2, I would like to specify that df1 should not have 
> some predicates pushed down, but some may be, but df2 should have all 
> predicates pushed down, even if target query joins df1 and df2. As far as I 
> understand Spark optimizer, if we use functions like `withAnalysisBarrier` 
> and put AnalysisBarrier explicitly in logical plan, then predicates won't be 
> pushed down on this particular DataFrames and PP will be still possible on 
> the second one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24288) Enable preventing predicate pushdown

2018-05-16 Thread Maryann Xue (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-24288:

Attachment: SPARK-24288.simple.patch

> Enable preventing predicate pushdown
> 
>
> Key: SPARK-24288
> URL: https://issues.apache.org/jira/browse/SPARK-24288
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Tomasz Gawęda
>Priority: Major
> Attachments: SPARK-24288.simple.patch
>
>
> Issue discussed on Mailing List: 
> [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html]
> While working with JDBC datasource I saw that many "or" clauses with 
> non-equality operators causes huge performance degradation of SQL query 
> to database (DB2). For example: 
> val df = spark.read.format("jdbc").(other options to parallelize 
> load).load() 
> df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x 
>  > 100)").show() // in real application whose predicates were pushed 
> many many lines below, many ANDs and ORs 
> If I use cache() before where, there is no predicate pushdown of this 
> "where" clause. However, in production system caching many sources is a 
> waste of memory (especially is pipeline is long and I must do cache many 
> times).There are also few more workarounds, but it would be great if Spark 
> will support preventing predicate pushdown by user.
>  
> For example: df.withAnalysisBarrier().where(...) ?
>  
> Note, that this should not be a global configuration option. If I read 2 
> DataFrames, df1 and df2, I would like to specify that df1 should not have 
> some predicates pushed down, but some may be, but df2 should have all 
> predicates pushed down, even if target query joins df1 and df2. As far as I 
> understand Spark optimizer, if we use functions like `withAnalysisBarrier` 
> and put AnalysisBarrier explicitly in logical plan, then predicates won't be 
> pushed down on this particular DataFrames and PP will be still possible on 
> the second one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24288) Enable preventing predicate pushdown

2018-05-16 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477747#comment-16477747
 ] 

Maryann Xue commented on SPARK-24288:
-

Thank you, [~cloud_fan] and [~TomaszGaweda] for the input! Here's some ideas 
from my side:

Basically the question comes down to whether 1) avoid predicate push-down to 
data source level or 2) avoid predicate push-down (or other push-down 
optimization) anywhere in the logical operator tree.

For 1) we could have the first two solutions [~cloud_fan] has proposed or the 
hint solution [~smilegator] has proposed.

For 2) we would use a logical barrier as I have mentioned earlier.

I'm in favor of the hint approach coz it would be an easy interface to use, 
less extendable maybe. It can be used for the entire query or with DataFrame 
interface as well. For example, like:

{{spark.sql("select /*+ NO_PRED_PUSHDOWN */ from foobar where name like 'x%'")}}

or

{{spark.jdbc(...).where("name like 'x%'").hint("NO_PRED_PUSHDOWN")}}

On the other hand, 2) is more powerful, for example, can be used to push part 
of the predicates down or to push certain while not other operators down. And 
like I said, it might not require a whole lot of changes. I am attaching an 
experiment patch here to as a quick illustration.

 

> Enable preventing predicate pushdown
> 
>
> Key: SPARK-24288
> URL: https://issues.apache.org/jira/browse/SPARK-24288
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Tomasz Gawęda
>Priority: Major
>
> Issue discussed on Mailing List: 
> [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html]
> While working with JDBC datasource I saw that many "or" clauses with 
> non-equality operators causes huge performance degradation of SQL query 
> to database (DB2). For example: 
> val df = spark.read.format("jdbc").(other options to parallelize 
> load).load() 
> df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x 
>  > 100)").show() // in real application whose predicates were pushed 
> many many lines below, many ANDs and ORs 
> If I use cache() before where, there is no predicate pushdown of this 
> "where" clause. However, in production system caching many sources is a 
> waste of memory (especially is pipeline is long and I must do cache many 
> times).There are also few more workarounds, but it would be great if Spark 
> will support preventing predicate pushdown by user.
>  
> For example: df.withAnalysisBarrier().where(...) ?
>  
> Note, that this should not be a global configuration option. If I read 2 
> DataFrames, df1 and df2, I would like to specify that df1 should not have 
> some predicates pushed down, but some may be, but df2 should have all 
> predicates pushed down, even if target query joins df1 and df2. As far as I 
> understand Spark optimizer, if we use functions like `withAnalysisBarrier` 
> and put AnalysisBarrier explicitly in logical plan, then predicates won't be 
> pushed down on this particular DataFrames and PP will be still possible on 
> the second one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24288) Enable preventing predicate pushdown

2018-05-16 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476985#comment-16476985
 ] 

Maryann Xue edited comment on SPARK-24288 at 5/16/18 7:46 AM:
--

The special operator would just look like
 {{sql("SELECT * FROM foobar").withOptimizerBarrier().where("THEID = 1")}}
 or
 {{spark.jdbc(...).withOptimizerBarrier().where("THEID = 1")}}

I assume the hint would be easier to use while the barrier would be more 
general (regardless of data source types) and give more control to the user 
(can even do partial predicate push down). If we were to implement other 
operator push down (e.g., agg, limit), the barrier would make more sense in 
terms of controlling up to which level things can be pushed down. I'm more 
inclined to do hint now if that covers [~TomaszGaweda]'s use case.


was (Author: maryannxue):
The special operator would just look like
{{sql("SELECT * FROM foobar").withOptimizerBarrier().where("THEID = 1")}}
or
{{spark.jdbc(...).withOptimizerBarrier().where("THEID = 1")}}

I assume the hint would be easier to use while the barrier would be more 
general (regardless of data source types) and give more control to the user. If 
we were to implement other operator push down (e.g., agg, limit), the barrier 
would make more sense in terms of controlling up to which level things can be 
pushed down. I'm more inclined to do hint now if that covers [~TomaszGaweda]'s 
use case.

> Enable preventing predicate pushdown
> 
>
> Key: SPARK-24288
> URL: https://issues.apache.org/jira/browse/SPARK-24288
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Tomasz Gawęda
>Priority: Major
>
> Issue discussed on Mailing List: 
> [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html]
> While working with JDBC datasource I saw that many "or" clauses with 
> non-equality operators causes huge performance degradation of SQL query 
> to database (DB2). For example: 
> val df = spark.read.format("jdbc").(other options to parallelize 
> load).load() 
> df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x 
>  > 100)").show() // in real application whose predicates were pushed 
> many many lines below, many ANDs and ORs 
> If I use cache() before where, there is no predicate pushdown of this 
> "where" clause. However, in production system caching many sources is a 
> waste of memory (especially is pipeline is long and I must do cache many 
> times).There are also few more workarounds, but it would be great if Spark 
> will support preventing predicate pushdown by user.
>  
> For example: df.withAnalysisBarrier().where(...) ?
>  
> Note, that this should not be a global configuration option. If I read 2 
> DataFrames, df1 and df2, I would like to specify that df1 should not have 
> some predicates pushed down, but some may be, but df2 should have all 
> predicates pushed down, even if target query joins df1 and df2. As far as I 
> understand Spark optimizer, if we use functions like `withAnalysisBarrier` 
> and put AnalysisBarrier explicitly in logical plan, then predicates won't be 
> pushed down on this particular DataFrames and PP will be still possible on 
> the second one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24288) Enable preventing predicate pushdown

2018-05-16 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476985#comment-16476985
 ] 

Maryann Xue commented on SPARK-24288:
-

The special operator would just look like
{{sql("SELECT * FROM foobar").withOptimizerBarrier().where("THEID = 1")}}
or
{{spark.jdbc(...).withOptimizerBarrier().where("THEID = 1")}}

I assume the hint would be easier to use while the barrier would be more 
general (regardless of data source types) and give more control to the user. If 
we were to implement other operator push down (e.g., agg, limit), the barrier 
would make more sense in terms of controlling up to which level things can be 
pushed down. I'm more inclined to do hint now if that covers [~TomaszGaweda]'s 
use case.

> Enable preventing predicate pushdown
> 
>
> Key: SPARK-24288
> URL: https://issues.apache.org/jira/browse/SPARK-24288
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Tomasz Gawęda
>Priority: Major
>
> Issue discussed on Mailing List: 
> [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html]
> While working with JDBC datasource I saw that many "or" clauses with 
> non-equality operators causes huge performance degradation of SQL query 
> to database (DB2). For example: 
> val df = spark.read.format("jdbc").(other options to parallelize 
> load).load() 
> df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x 
>  > 100)").show() // in real application whose predicates were pushed 
> many many lines below, many ANDs and ORs 
> If I use cache() before where, there is no predicate pushdown of this 
> "where" clause. However, in production system caching many sources is a 
> waste of memory (especially is pipeline is long and I must do cache many 
> times).There are also few more workarounds, but it would be great if Spark 
> will support preventing predicate pushdown by user.
>  
> For example: df.withAnalysisBarrier().where(...) ?
>  
> Note, that this should not be a global configuration option. If I read 2 
> DataFrames, df1 and df2, I would like to specify that df1 should not have 
> some predicates pushed down, but some may be, but df2 should have all 
> predicates pushed down, even if target query joins df1 and df2. As far as I 
> understand Spark optimizer, if we use functions like `withAnalysisBarrier` 
> and put AnalysisBarrier explicitly in logical plan, then predicates won't be 
> pushed down on this particular DataFrames and PP will be still possible on 
> the second one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24288) Enable preventing predicate pushdown

2018-05-16 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476900#comment-16476900
 ] 

Maryann Xue commented on SPARK-24288:
-

After taking a closer look at the problem, I realized that we should probably 
define the scope of the problem first. The issue [~TomaszGaweda] encountered 
specifically is predicates being pushed down to the data source; while a more 
general topic is the isolation of plan optimization/transformation between 
operators, somewhat like the effect "cache()" would achieve.

1) So for the first specific purpose, i.e., avoid predicate push-down to the 
data source, we could have an option when initiating the {{DataSource}}, e.g., 
by calling  {{spark.read.option()}} or {{spark.jdbc()}} with options, etc. This 
might not cover all cases of DataSource creation, but would be a workable 
solution for users.

2) For the more general purpose (which means we can stop push-down at any 
level), we could add a special logical operator (let's call it OptimizerBarrier 
maybe) that serves as a barrier node when performing optimizations. Then we 
could simply ditch this node at the time of physical plan transformation.

I personally prefer the second solution, which serves a more general purpose 
and may require less code changes (did a quick experiment and it worked). 
Please let me know your thoughts, [~smilegator], [~TomaszGaweda].

> Enable preventing predicate pushdown
> 
>
> Key: SPARK-24288
> URL: https://issues.apache.org/jira/browse/SPARK-24288
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Tomasz Gawęda
>Priority: Major
>
> Issue discussed on Mailing List: 
> [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html]
> While working with JDBC datasource I saw that many "or" clauses with 
> non-equality operators causes huge performance degradation of SQL query 
> to database (DB2). For example: 
> val df = spark.read.format("jdbc").(other options to parallelize 
> load).load() 
> df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x 
>  > 100)").show() // in real application whose predicates were pushed 
> many many lines below, many ANDs and ORs 
> If I use cache() before where, there is no predicate pushdown of this 
> "where" clause. However, in production system caching many sources is a 
> waste of memory (especially is pipeline is long and I must do cache many 
> times).There are also few more workarounds, but it would be great if Spark 
> will support preventing predicate pushdown by user.
>  
> For example: df.withAnalysisBarrier().where(...) ?
>  
> Note, that this should not be a global configuration option. If I read 2 
> DataFrames, df1 and df2, I would like to specify that df1 should not have 
> some predicates pushed down, but some may be, but df2 should have all 
> predicates pushed down, even if target query joins df1 and df2. As far as I 
> understand Spark optimizer, if we use functions like `withAnalysisBarrier` 
> and put AnalysisBarrier explicitly in logical plan, then predicates won't be 
> pushed down on this particular DataFrames and PP will be still possible on 
> the second one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24288) Enable preventing predicate pushdown

2018-05-15 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476372#comment-16476372
 ] 

Maryann Xue commented on SPARK-24288:
-

I agree with you, [~TomaszGaweda], that making this a special operator would be 
a better way than adding a option. It would be finer-grained both in terms of 
different DataFrames and in terms of different operators (for example, in 
future, Spark is able to push down other operators to the data source).

> Enable preventing predicate pushdown
> 
>
> Key: SPARK-24288
> URL: https://issues.apache.org/jira/browse/SPARK-24288
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Tomasz Gawęda
>Priority: Major
>
> Issue discussed on Mailing List: 
> [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html]
> While working with JDBC datasource I saw that many "or" clauses with 
> non-equality operators causes huge performance degradation of SQL query 
> to database (DB2). For example: 
> val df = spark.read.format("jdbc").(other options to parallelize 
> load).load() 
> df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x 
>  > 100)").show() // in real application whose predicates were pushed 
> many many lines below, many ANDs and ORs 
> If I use cache() before where, there is no predicate pushdown of this 
> "where" clause. However, in production system caching many sources is a 
> waste of memory (especially is pipeline is long and I must do cache many 
> times).There are also few more workarounds, but it would be great if Spark 
> will support preventing predicate pushdown by user.
>  
> For example: df.withAnalysisBarrier().where(...) ?
>  
> Note, that this should not be a global configuration option. If I read 2 
> DataFrames, df1 and df2, I would like to specify that df1 should not have 
> some predicates pushed down, but some may be, but df2 should have all 
> predicates pushed down, even if target query joins df1 and df2. As far as I 
> understand Spark optimizer, if we use functions like `withAnalysisBarrier` 
> and put AnalysisBarrier explicitly in logical plan, then predicates won't be 
> pushed down on this particular DataFrames and PP will be still possible on 
> the second one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24288) Enable preventing predicate pushdown

2018-05-15 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476368#comment-16476368
 ] 

Maryann Xue commented on SPARK-24288:
-

Feel free to assign this to me then, [~smilegator].

> Enable preventing predicate pushdown
> 
>
> Key: SPARK-24288
> URL: https://issues.apache.org/jira/browse/SPARK-24288
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Tomasz Gawęda
>Priority: Major
>
> Issue discussed on Mailing List: 
> [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html]
> While working with JDBC datasource I saw that many "or" clauses with 
> non-equality operators causes huge performance degradation of SQL query 
> to database (DB2). For example: 
> val df = spark.read.format("jdbc").(other options to parallelize 
> load).load() 
> df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x 
>  > 100)").show() // in real application whose predicates were pushed 
> many many lines below, many ANDs and ORs 
> If I use cache() before where, there is no predicate pushdown of this 
> "where" clause. However, in production system caching many sources is a 
> waste of memory (especially is pipeline is long and I must do cache many 
> times).There are also few more workarounds, but it would be great if Spark 
> will support preventing predicate pushdown by user.
>  
> For example: df.withAnalysisBarrier().where(...) ?
>  
> Note, that this should not be a global configuration option. If I read 2 
> DataFrames, df1 and df2, I would like to specify that df1 should not have 
> some predicates pushed down, but some may be, but df2 should have all 
> predicates pushed down, even if target query joins df1 and df2. As far as I 
> understand Spark optimizer, if we use functions like `withAnalysisBarrier` 
> and put AnalysisBarrier explicitly in logical plan, then predicates won't be 
> pushed down on this particular DataFrames and PP will be still possible on 
> the second one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24164) Support column list as the pivot column in Pivot

2018-05-02 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-24164:
---

 Summary: Support column list as the pivot column in Pivot
 Key: SPARK-24164
 URL: https://issues.apache.org/jira/browse/SPARK-24164
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: Maryann Xue


This is part of a functionality extension to Pivot SQL support as SPARK-24035.

Currently, we only support a single column as the pivot column, while a column 
list as the pivot column would look like:
{code:java}
SELECT * FROM (
  SELECT year, course, earnings FROM courseSales
)
PIVOT (
  sum(earnings)
  FOR (course, year) IN (('dotNET', 2012), ('Java', 2013))
);{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24163) Support "ANY" or sub-query for Pivot "IN" clause

2018-05-02 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-24163:
---

 Summary: Support "ANY" or sub-query for Pivot "IN" clause
 Key: SPARK-24163
 URL: https://issues.apache.org/jira/browse/SPARK-24163
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: Maryann Xue


This is part of a functionality extension to Pivot SQL support as SPARK-24035.

Currently, only literal values are allowed in Pivot "IN" clause. To support ANY 
or a sub-query in the "IN" clause (the examples of which provided below), we 
need to enable evaluation of a sub-query before/during query analysis time.
{code:java}
SELECT * FROM (
  SELECT year, course, earnings FROM courseSales
)
PIVOT (
  sum(earnings)
  FOR course IN ANY
);{code}
{code:java}
SELECT * FROM (
  SELECT year, course, earnings FROM courseSales
)
PIVOT (
  sum(earnings)
  FOR course IN (
SELECT course FROM courses
WHERE region = 'AZ'
  )
);
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24162) Support aliased literal values for Pivot "IN" clause

2018-05-02 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-24162:
---

 Summary: Support aliased literal values for Pivot "IN" clause
 Key: SPARK-24162
 URL: https://issues.apache.org/jira/browse/SPARK-24162
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: Maryann Xue


This is part of a functionality extension to Pivot SQL support as SPARK-24035.

When literal values are specified in Pivot IN clause, it would be nice to allow 
aliases for those values so that the output column names can be customized. For 
example:
{code:java}
SELECT * FROM (
  SELECT year, course, earnings FROM courseSales
)
PIVOT (
  sum(earnings)
  FOR course IN ('dotNET' as c1, 'Java' as c2)
);{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20169) Groupby Bug with Sparksql

2018-03-29 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16419808#comment-16419808
 ] 

Maryann Xue commented on SPARK-20169:
-

[~smilegator], I think this is also caused by SPARK-23368, so could you please 
assign someone else to review https://github.com/apache/spark/pull/20613 if the 
original person still does not respond in a few days?

> Groupby Bug with Sparksql
> -
>
> Key: SPARK-20169
> URL: https://issues.apache.org/jira/browse/SPARK-20169
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Bin Wu
>Priority: Major
>
> We find a potential bug in Catalyst optimizer which cannot correctly 
> process "groupby". You can reproduce it by following simple example:
> =
> from pyspark.sql.functions import *
> #e=sc.parallelize([(1,2),(1,3),(1,4),(2,1),(3,1),(4,1)]).toDF(["src","dst"])
> e = spark.read.csv("graph.csv", header=True)
> r = sc.parallelize([(1,),(2,),(3,),(4,)]).toDF(['src'])
> r1 = e.join(r, 'src').groupBy('dst').count().withColumnRenamed('dst','src')
> jr = e.join(r1, 'src')
> jr.show()
> r2 = jr.groupBy('dst').count()
> r2.show()
> =
> FYI, "graph.csv" contains exactly the same data as the commented line.
> You can find that jr is:
> |src|dst|count|
> |  3|  1|1|
> |  1|  4|3|
> |  1|  3|3|
> |  1|  2|3|
> |  4|  1|1|
> |  2|  1|1|
> But, after the last groupBy, the 3 rows with dst = 1 are not grouped together:
> |dst|count|
> |  1|1|
> |  4|1|
> |  3|1|
> |  2|1|
> |  1|1|
> |  1|1|
> If we build jr directly from raw data (commented line), this error will not 
> show up.  So 
> we suspect  that there is a bug in the Catalyst optimizer when multiple joins 
> and groupBy's 
> are being optimized. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20169) Groupby Bug with Sparksql

2018-03-28 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418282#comment-16418282
 ] 

Maryann Xue commented on SPARK-20169:
-

This was related to the use of "withColumnRenamed", which will add a "Project" 
on top of the original relation. And later on, when getting the 
"outputPartitioning" of the "Project", it returned HashPartitioning("dst") 
while it should return HashPartitioning("src") since it had been renamed. As a 
result, an "Exchange" node was missing between the two outmost HashAggregate 
nodes, and that's why the aggregate result was not correct.
{code:java}
test("SPARK-20169") {
  val e = Seq((1, 2), (1, 3), (1, 4), (2, 1), (3, 1), (4, 1)).toDF("src", "dst")
  val r = Seq((1), (2), (3), (4)).toDF("src")
  val r1 = e.join(r, "src" :: 
Nil).groupBy("dst").count().withColumnRenamed("dst", "src")
  val jr = e.join(r1, "src" :: Nil)
  val r2 = jr.groupBy("dst").count
  r2.explain()
  r2.show
}
{code}

The physical plan resulted from this bug turned out to be
{code}
== Physical Plan ==
*(2) HashAggregate(keys=[dst#181], functions=[count(1)])
+- *(2) HashAggregate(keys=[dst#181], functions=[partial_count(1)])
   +- *(2) Project [dst#181]
  +- *(2) BroadcastHashJoin [src#180], [src#197], Inner, BuildLeft
 :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, 
int, false] as bigint)))
 :  +- LocalTableScan [src#180, dst#181]
 +- *(2) HashAggregate(keys=[dst#181], functions=[])
+- Exchange hashpartitioning(dst#181, 5)
   +- *(1) HashAggregate(keys=[dst#181], functions=[])
  +- *(1) Project [dst#181]
 +- *(1) BroadcastHashJoin [src#180], [src#187], Inner, 
BuildRight
:- LocalTableScan [src#180, dst#181]
+- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   +- LocalTableScan [src#187]
{code}

The correct physical plan should be
{code}
== Physical Plan ==
*(3) HashAggregate(keys=[dst#181], functions=[count(1)])
+- Exchange hashpartitioning(dst#181, 5)
   +- *(2) HashAggregate(keys=[dst#181], functions=[partial_count(1)])
  +- *(2) Project [dst#181]
 +- *(2) BroadcastHashJoin [src#180], [src#197], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, 
int, false] as bigint)))
:  +- LocalTableScan [src#180, dst#181]
+- *(2) HashAggregate(keys=[dst#181], functions=[])
   +- Exchange hashpartitioning(dst#181, 5)
  +- *(1) HashAggregate(keys=[dst#181], functions=[])
 +- *(1) Project [dst#181]
+- *(1) BroadcastHashJoin [src#180], [src#187], Inner, 
BuildRight
   :- LocalTableScan [src#180, dst#181]
   +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
  +- LocalTableScan [src#187]
{code}

This happens to the same issue as SPARK-23368. The PR for SPARK-23368 fixes it.

> Groupby Bug with Sparksql
> -
>
> Key: SPARK-20169
> URL: https://issues.apache.org/jira/browse/SPARK-20169
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Bin Wu
>Priority: Major
>
> We find a potential bug in Catalyst optimizer which cannot correctly 
> process "groupby". You can reproduce it by following simple example:
> =
> from pyspark.sql.functions import *
> #e=sc.parallelize([(1,2),(1,3),(1,4),(2,1),(3,1),(4,1)]).toDF(["src","dst"])
> e = spark.read.csv("graph.csv", header=True)
> r = sc.parallelize([(1,),(2,),(3,),(4,)]).toDF(['src'])
> r1 = e.join(r, 'src').groupBy('dst').count().withColumnRenamed('dst','src')
> jr = e.join(r1, 'src')
> jr.show()
> r2 = jr.groupBy('dst').count()
> r2.show()
> =
> FYI, "graph.csv" contains exactly the same data as the commented line.
> You can find that jr is:
> |src|dst|count|
> |  3|  1|1|
> |  1|  4|3|
> |  1|  3|3|
> |  1|  2|3|
> |  4|  1|1|
> |  2|  1|1|
> But, after the last groupBy, the 3 rows with dst = 1 are not grouped together:
> |dst|count|
> |  1|1|
> |  4|1|
> |  3|1|
> |  2|1|
> |  1|1|
> |  1|1|
> If we build jr directly from raw data (commented line), this error will not 
> show up.  So 
> we suspect  that there is a bug in the Catalyst optimizer when multiple joins 
> and groupBy's 
> are being optimized. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[jira] [Commented] (SPARK-23368) Avoid unnecessary Exchange or Sort after projection

2018-02-15 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366374#comment-16366374
 ] 

Maryann Xue commented on SPARK-23368:
-

[~cloud_fan], [~smilegator], Could you please help review this PR? Thanks in 
advance!

> Avoid unnecessary Exchange or Sort after projection
> ---
>
> Key: SPARK-23368
> URL: https://issues.apache.org/jira/browse/SPARK-23368
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Maryann Xue
>Priority: Minor
>
> After column rename projection, the ProjectExec's outputOrdering and 
> outputPartitioning should reflect the projected columns as well. For example,
> {code:java}
> SELECT b1
> FROM (
> SELECT a a1, b b1
> FROM testData2
> ORDER BY a
> )
> ORDER BY a1{code}
> The inner query is ordered on a1 as well. If we had a rule to eliminate Sort 
> on sorted result, together with this fix, the order-by in the outer query 
> could have been optimized out.
>  
> Similarly, the below query
> {code:java}
> SELECT *
> FROM (
> SELECT t1.a a1, t2.a a2, t1.b b1, t2.b b2
> FROM testData2 t1
> LEFT JOIN testData2 t2
> ON t1.a = t2.a
> )
> JOIN testData2 t3
> ON a1 = t3.a{code}
> is equivalent to
> {code:java}
> SELECT *
> FROM testData2 t1
> LEFT JOIN testData2 t2
> ON t1.a = t2.a
> JOIN testData2 t3
> ON t1.a = t3.a{code}
> , so the unnecessary sorting and hash-partitioning that have been optimized 
> out for the second query should have be eliminated in the first query as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23368) Avoid unnecessary Exchange or Sort after projection

2018-02-13 Thread Maryann Xue (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-23368:

Summary: Avoid unnecessary Exchange or Sort after projection  (was: 
OutputOrdering and OutputPartitioning in ProjectExec should reflect the 
projected columns)

> Avoid unnecessary Exchange or Sort after projection
> ---
>
> Key: SPARK-23368
> URL: https://issues.apache.org/jira/browse/SPARK-23368
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Maryann Xue
>Priority: Minor
>
> After column rename projection, the ProjectExec's outputOrdering and 
> outputPartitioning should reflect the projected columns as well. For example,
> {code:java}
> SELECT b1
> FROM (
> SELECT a a1, b b1
> FROM testData2
> ORDER BY a
> )
> ORDER BY a1{code}
> The inner query is ordered on a1 as well. If we had a rule to eliminate Sort 
> on sorted result, together with this fix, the order-by in the outer query 
> could have been optimized out.
>  
> Similarly, the below query
> {code:java}
> SELECT *
> FROM (
> SELECT t1.a a1, t2.a a2, t1.b b1, t2.b b2
> FROM testData2 t1
> LEFT JOIN testData2 t2
> ON t1.a = t2.a
> )
> JOIN testData2 t3
> ON a1 = t3.a{code}
> is equivalent to
> {code:java}
> SELECT *
> FROM testData2 t1
> LEFT JOIN testData2 t2
> ON t1.a = t2.a
> JOIN testData2 t3
> ON t1.a = t3.a{code}
> , so the unnecessary sorting and hash-partitioning that have been optimized 
> out for the second query should have be eliminated in the first query as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23368) OutputOrdering and OutputPartitioning in ProjectExec should reflect the projected columns

2018-02-08 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-23368:
---

 Summary: OutputOrdering and OutputPartitioning in ProjectExec 
should reflect the projected columns
 Key: SPARK-23368
 URL: https://issues.apache.org/jira/browse/SPARK-23368
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: Maryann Xue


After column rename projection, the ProjectExec's outputOrdering and 
outputPartitioning should reflect the projected columns as well. For example,
{code:java}
SELECT b1
FROM (
SELECT a a1, b b1
FROM testData2
ORDER BY a
)
ORDER BY a1{code}
The inner query is ordered on a1 as well. If we had a rule to eliminate Sort on 
sorted result, together with this fix, the order-by in the outer query could 
have been optimized out.

 

Similarly, the below query
{code:java}
SELECT *
FROM (
SELECT t1.a a1, t2.a a2, t1.b b1, t2.b b2
FROM testData2 t1
LEFT JOIN testData2 t2
ON t1.a = t2.a
)
JOIN testData2 t3
ON a1 = t3.a{code}
is equivalent to
{code:java}
SELECT *
FROM testData2 t1
LEFT JOIN testData2 t2
ON t1.a = t2.a
JOIN testData2 t3
ON t1.a = t3.a{code}
, so the unnecessary sorting and hash-partitioning that have been optimized out 
for the second query should have be eliminated in the first query as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22266) The same aggregate function was evaluated multiple times

2017-10-13 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16203105#comment-16203105
 ] 

Maryann Xue commented on SPARK-22266:
-

Thank you for the comment, [~maropu]! I think it is a problem of common 
subexpression elimination, but feel like this kind of CSE had better be 
performed high level rather than in code-gen. And PhysicalAggregation is 
designed to pull out the aggregate functions from result expressions so that 
later on at code-gen (and non-code-gen as well) stage HashAggregate can assume 
that all aggregate functions are purely aggregation and no other expressions. 
So if CSE in code-gen were to handle this, we would end up having extra 
non-aggregate expressions in "aggregate expression" list.
Not sure if we should have a logical/physical optimization dedicated to CSE, 
but I think it would be nice. I applied a simple straightforward fix in the PR. 
Could you please review? Thank you in advance!

> The same aggregate function was evaluated multiple times
> 
>
> Key: SPARK-22266
> URL: https://issues.apache.org/jira/browse/SPARK-22266
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Maryann Xue
>Priority: Minor
>
> We should avoid the same aggregate function being evaluated more than once, 
> and this is what has been stated in the code comment below 
> (patterns.scala:206). However things didn't work as expected.
> {code}
>   // A single aggregate expression might appear multiple times in 
> resultExpressions.
>   // In order to avoid evaluating an individual aggregate function 
> multiple times, we'll
>   // build a set of the distinct aggregate expressions and build a 
> function which can
>   // be used to re-write expressions so that they reference the single 
> copy of the
>   // aggregate function which actually gets computed.
> {code}
> For example, the physical plan of
> {code}
> SELECT a, max(b+1), max(b+1) + 1 FROM testData2 GROUP BY a
> {code}
> was
> {code}
> HashAggregate(keys=[a#23], functions=[max((b#24 + 1)), max((b#24 + 1))], 
> output=[a#23, max((b + 1))#223, (max((b + 1)) + 1)#224])
> +- HashAggregate(keys=[a#23], functions=[partial_max((b#24 + 1)), 
> partial_max((b#24 + 1))], output=[a#23, max#231, max#232])
>+- SerializeFromObject [assertnotnull(input[0, 
> org.apache.spark.sql.test.SQLTestData$TestData2, true]).a AS a#23, 
> assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, 
> true]).b AS b#24]
>   +- Scan ExternalRDDScan[obj#22]
> {code}
> , where in each HashAggregate there were two identical aggregate functions 
> "max(b#24 + 1)".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22266) The same aggregate function was evaluated multiple times

2017-10-12 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-22266:
---

 Summary: The same aggregate function was evaluated multiple times
 Key: SPARK-22266
 URL: https://issues.apache.org/jira/browse/SPARK-22266
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.0
Reporter: Maryann Xue
Priority: Minor


We should avoid the same aggregate function being evaluated more than once, and 
this is what has been stated in the code comment below (patterns.scala:206). 
However things didn't work as expected.
{code}
  // A single aggregate expression might appear multiple times in 
resultExpressions.
  // In order to avoid evaluating an individual aggregate function multiple 
times, we'll
  // build a set of the distinct aggregate expressions and build a function 
which can
  // be used to re-write expressions so that they reference the single copy 
of the
  // aggregate function which actually gets computed.
{code}
For example, the physical plan of
{code}
SELECT a, max(b+1), max(b+1) + 1 FROM testData2 GROUP BY a
{code}
was
{code}
HashAggregate(keys=[a#23], functions=[max((b#24 + 1)), max((b#24 + 1))], 
output=[a#23, max((b + 1))#223, (max((b + 1)) + 1)#224])
+- HashAggregate(keys=[a#23], functions=[partial_max((b#24 + 1)), 
partial_max((b#24 + 1))], output=[a#23, max#231, max#232])
   +- SerializeFromObject [assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData2, true]).a AS a#23, 
assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, 
true]).b AS b#24]
  +- Scan ExternalRDDScan[obj#22]
{code}
, where in each HashAggregate there were two identical aggregate functions 
"max(b#24 + 1)".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21998) SortMergeJoinExec did not calculate its outputOrdering correctly during physical planning

2017-09-19 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16172169#comment-16172169
 ] 

Maryann Xue commented on SPARK-21998:
-

Thanks again for your comment, [~maropu]! I changed the title and description 
of this JIRA accordingly and created a PR as 
https://github.com/apache/spark/pull/19281. Could you please take a look?

> SortMergeJoinExec did not calculate its outputOrdering correctly during 
> physical planning
> -
>
> Key: SPARK-21998
> URL: https://issues.apache.org/jira/browse/SPARK-21998
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Maryann Xue
>Priority: Minor
>
> Right now the calculation of SortMergeJoinExec's outputOrdering relies on the 
> fact that its children have already been sorted on the join keys, while this 
> is often not true until EnsureRequirements has been applied.
> {code}
>   /**
>* For SMJ, child's output must have been sorted on key or expressions with 
> the same order as
>* key, so we can get ordering for key from child's output ordering.
>*/
>   private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: 
> Seq[SortOrder])
> : Seq[SortOrder] = {
> keys.zip(childOutputOrdering).map { case (key, childOrder) =>
>   SortOrder(key, Ascending, childOrder.sameOrderExpressions + 
> childOrder.child - key)
> }
>   }
> {code}
> Thus SortMergeJoinExec's outputOrdering is most likely not correct during the 
> physical planning stage, and as a result, potential physical optimizations 
> that rely on the required/output orderings, like SPARK-18591, will not work 
> for SortMergeJoinExec.
> The right behavior of {{getKeyOrdering(keys, childOutputOrdering)}} should be:
> 1. If the childOutputOrdering satisfies (is a superset of) the required child 
> ordering => childOutputOrdering
> 2. Otherwise => required child ordering



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21998) SortMergeJoinExec did not calculate its outputOrdering correctly during physical planning

2017-09-19 Thread Maryann Xue (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-21998:

Description: 
Right now the calculation of SortMergeJoinExec's outputOrdering relies on the 
fact that its children have already been sorted on the join keys, while this is 
often not true until EnsureRequirements has been applied.
{code}
  /**
   * For SMJ, child's output must have been sorted on key or expressions with 
the same order as
   * key, so we can get ordering for key from child's output ordering.
   */
  private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: 
Seq[SortOrder])
: Seq[SortOrder] = {
keys.zip(childOutputOrdering).map { case (key, childOrder) =>
  SortOrder(key, Ascending, childOrder.sameOrderExpressions + 
childOrder.child - key)
}
  }
{code}
Thus SortMergeJoinExec's outputOrdering is most likely not correct during the 
physical planning stage, and as a result, potential physical optimizations that 
rely on the required/output orderings, like SPARK-18591, will not work for 
SortMergeJoinExec.
The right behavior of {{getKeyOrdering(keys, childOutputOrdering)}} should be:
1. If the childOutputOrdering satisfies (is a superset of) the required child 
ordering => childOutputOrdering
2. Otherwise => required child ordering

  was:
Right now SortMergeJoinExec calculates its outputOrdering based on its 
children's outputOrdering, thus oftentimes the SortMergeJoinExec's 
outputOrdering is NOT correct until after EnsureRequirements, which happens at 
a rather late stage. As a result, potential optimizations that rely on the 
required/output orderings, like SPARK-18591, will not work for 
SortMergeJoinExec.
Unlike operators like Project or Filter, which simply preserve the ordering of 
their inputs, the SortMergeJoinExec has a behavior that generates a new 
ordering in its output regardless of the orderings of its children. I think the 
code below together with its comment is buggy.
{code}
  /**
   * For SMJ, child's output must have been sorted on key or expressions with 
the same order as
   * key, so we can get ordering for key from child's output ordering.
   */
  private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: 
Seq[SortOrder])
: Seq[SortOrder] = {
keys.zip(childOutputOrdering).map { case (key, childOrder) =>
  SortOrder(key, Ascending, childOrder.sameOrderExpressions + 
childOrder.child - key)
}
  }
{code}


> SortMergeJoinExec did not calculate its outputOrdering correctly during 
> physical planning
> -
>
> Key: SPARK-21998
> URL: https://issues.apache.org/jira/browse/SPARK-21998
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Maryann Xue
>Priority: Minor
>
> Right now the calculation of SortMergeJoinExec's outputOrdering relies on the 
> fact that its children have already been sorted on the join keys, while this 
> is often not true until EnsureRequirements has been applied.
> {code}
>   /**
>* For SMJ, child's output must have been sorted on key or expressions with 
> the same order as
>* key, so we can get ordering for key from child's output ordering.
>*/
>   private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: 
> Seq[SortOrder])
> : Seq[SortOrder] = {
> keys.zip(childOutputOrdering).map { case (key, childOrder) =>
>   SortOrder(key, Ascending, childOrder.sameOrderExpressions + 
> childOrder.child - key)
> }
>   }
> {code}
> Thus SortMergeJoinExec's outputOrdering is most likely not correct during the 
> physical planning stage, and as a result, potential physical optimizations 
> that rely on the required/output orderings, like SPARK-18591, will not work 
> for SortMergeJoinExec.
> The right behavior of {{getKeyOrdering(keys, childOutputOrdering)}} should be:
> 1. If the childOutputOrdering satisfies (is a superset of) the required child 
> ordering => childOutputOrdering
> 2. Otherwise => required child ordering



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21998) SortMergeJoinExec did not calculate its outputOrdering correctly during physical planning

2017-09-19 Thread Maryann Xue (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-21998:

Summary: SortMergeJoinExec did not calculate its outputOrdering correctly 
during physical planning  (was: SortMergeJoinExec should calculate its 
outputOrdering independent of its children's outputOrdering)

> SortMergeJoinExec did not calculate its outputOrdering correctly during 
> physical planning
> -
>
> Key: SPARK-21998
> URL: https://issues.apache.org/jira/browse/SPARK-21998
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Maryann Xue
>Priority: Minor
>
> Right now SortMergeJoinExec calculates its outputOrdering based on its 
> children's outputOrdering, thus oftentimes the SortMergeJoinExec's 
> outputOrdering is NOT correct until after EnsureRequirements, which happens 
> at a rather late stage. As a result, potential optimizations that rely on the 
> required/output orderings, like SPARK-18591, will not work for 
> SortMergeJoinExec.
> Unlike operators like Project or Filter, which simply preserve the ordering 
> of their inputs, the SortMergeJoinExec has a behavior that generates a new 
> ordering in its output regardless of the orderings of its children. I think 
> the code below together with its comment is buggy.
> {code}
>   /**
>* For SMJ, child's output must have been sorted on key or expressions with 
> the same order as
>* key, so we can get ordering for key from child's output ordering.
>*/
>   private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: 
> Seq[SortOrder])
> : Seq[SortOrder] = {
> keys.zip(childOutputOrdering).map { case (key, childOrder) =>
>   SortOrder(key, Ascending, childOrder.sameOrderExpressions + 
> childOrder.child - key)
> }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21998) SortMergeJoinExec should calculate its outputOrdering independent of its children's outputOrdering

2017-09-13 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16165765#comment-16165765
 ] 

Maryann Xue commented on SPARK-21998:
-

Thank you for pointing this out, [~maropu]! Yes, you are right.
It would be more accurate to say that the outputOrdering of SortMergeJoinExec 
should be a superset of (i.e., satisfy) the join key ordering.
Let's look at {{SELECT * FROM t1 JOIN t2 ON t1.a = t2.z}} for example and focus 
on {{t1}} for simplicity,
1) If t1 is not sorted on a, the SMJ outputOrdering should be (a, ASC)
2) If t1 is sorted on a, the SMJ outputOrdering should be (a, ASC)
3) If t1 is sorted both on a and b, the SMJ outputOrdering should at least 
include (a, ASC), and whether the SMJ output is still sorted on b remains 
implementation specific, and in current Spark SQL implementation it surely is. 
But we can imagine a less smart SMJ implementation which cannot avoid extra 
sorting on t1 and meanwhile uses an unstable sorting can destroy the sortedness 
of column b.

Anyway, my point is the current SMJ implementation will return NOTHING as 
outputOrdering when the child ordering is not immediately available and can 
only return the right value at a later stage after EnsureRequirements happens. 
You can try my query example in 
https://issues.apache.org/jira/browse/SPARK-18591?focusedCommentId=16165148=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16165148
 with your proposed change for SPARK-18591 and see what's going on.

> SortMergeJoinExec should calculate its outputOrdering independent of its 
> children's outputOrdering
> --
>
> Key: SPARK-21998
> URL: https://issues.apache.org/jira/browse/SPARK-21998
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Maryann Xue
>Priority: Minor
>
> Right now SortMergeJoinExec calculates its outputOrdering based on its 
> children's outputOrdering, thus oftentimes the SortMergeJoinExec's 
> outputOrdering is NOT correct until after EnsureRequirements, which happens 
> at a rather late stage. As a result, potential optimizations that rely on the 
> required/output orderings, like SPARK-18591, will not work for 
> SortMergeJoinExec.
> Unlike operators like Project or Filter, which simply preserve the ordering 
> of their inputs, the SortMergeJoinExec has a behavior that generates a new 
> ordering in its output regardless of the orderings of its children. I think 
> the code below together with its comment is buggy.
> {code}
>   /**
>* For SMJ, child's output must have been sorted on key or expressions with 
> the same order as
>* key, so we can get ordering for key from child's output ordering.
>*/
>   private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: 
> Seq[SortOrder])
> : Seq[SortOrder] = {
> keys.zip(childOutputOrdering).map { case (key, childOrder) =>
>   SortOrder(key, Ascending, childOrder.sameOrderExpressions + 
> childOrder.child - key)
> }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18591) Replace hash-based aggregates with sort-based ones if inputs already sorted

2017-09-13 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16165148#comment-16165148
 ] 

Maryann Xue commented on SPARK-18591:
-

Hey, I came up with this idea of doing sort-aggregate on sorted input while 
debugging some performance issue, and it was before I saw this JIRA. I tried 
implementing it and found out that I couldn't do it in physical planning 
because it was top-down, so I ended up doing the same thing as 
https://github.com/maropu/spark/commit/32b716cf02dfe8cba5b08b2dc3297bc061156630#diff-7d06cf071190dcbeda2fed6b039ec5d0R55.
I totally agree with [~maropu] that we need to make the physical planning 
bottom-up and then we can solve this in a better way.
But aside from this, I found another issue with a slightly more sophisticated 
query example:
{{SELECT t1.a, count\(*\) FROM t1 JOIN t2 ON t1.a = t2.b GROUP BY t1.a}}
This would only work if we put the {{ReplaceSortAggregate}} rule after 
{{EnsureRequirements}} because SortMergeJoinExec's outputOrdering is not 
correct before EnsureRequirements happens. Please refer to SPARK-21998 for 
details.

> Replace hash-based aggregates with sort-based ones if inputs already sorted
> ---
>
> Key: SPARK-18591
> URL: https://issues.apache.org/jira/browse/SPARK-18591
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.2
>Reporter: Takeshi Yamamuro
>
> Spark currently uses sort-based aggregates only in limited condition; the 
> cases where spark cannot use partial aggregates and hash-based ones.
> However, if input ordering has already satisfied the requirements of 
> sort-based aggregates, it seems sort-based ones are faster than the other.
> {code}
> ./bin/spark-shell --conf spark.sql.shuffle.partitions=1
> val df = spark.range(1000).selectExpr("id AS key", "id % 10 AS 
> value").sort($"key").cache
> def timer[R](block: => R): R = {
>   val t0 = System.nanoTime()
>   val result = block
>   val t1 = System.nanoTime()
>   println("Elapsed time: " + ((t1 - t0 + 0.0) / 10.0)+ "s")
>   result
> }
> timer {
>   df.groupBy("key").count().count
> }
> // codegen'd hash aggregate
> Elapsed time: 7.116962977s
> // non-codegen'd sort aggregarte
> Elapsed time: 3.088816662s
> {code}
> If codegen'd sort-based aggregates are supported in SPARK-16844, this seems 
> to make the performance gap bigger;
> {code}
> - codegen'd sort aggregate
> Elapsed time: 1.645234684s
> {code} 
> Therefore, it'd be better to use sort-based ones in this case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21998) SortMergeJoinExec should calculate its outputOrdering independent of its children's outputOrdering

2017-09-13 Thread Maryann Xue (JIRA)
Maryann Xue created SPARK-21998:
---

 Summary: SortMergeJoinExec should calculate its outputOrdering 
independent of its children's outputOrdering
 Key: SPARK-21998
 URL: https://issues.apache.org/jira/browse/SPARK-21998
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.0
Reporter: Maryann Xue
Priority: Minor


Right now SortMergeJoinExec calculates its outputOrdering based on its 
children's outputOrdering, thus oftentimes the SortMergeJoinExec's 
outputOrdering is NOT correct until after EnsureRequirements, which happens at 
a rather late stage. As a result, potential optimizations that rely on the 
required/output orderings, like SPARK-18591, will not work for 
SortMergeJoinExec.
Unlike operators like Project or Filter, which simply preserve the ordering of 
their inputs, the SortMergeJoinExec has a behavior that generates a new 
ordering in its output regardless of the orderings of its children. I think the 
code below together with its comment is buggy.
{code}
  /**
   * For SMJ, child's output must have been sorted on key or expressions with 
the same order as
   * key, so we can get ordering for key from child's output ordering.
   */
  private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: 
Seq[SortOrder])
: Seq[SortOrder] = {
keys.zip(childOutputOrdering).map { case (key, childOrder) =>
  SortOrder(key, Ascending, childOrder.sameOrderExpressions + 
childOrder.child - key)
}
  }
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org