[jira] [Updated] (SPARK-23128) A new approach to do adaptive execution in Spark SQL

2020-04-10 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-23128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan updated SPARK-23128:

Parent: SPARK-31412
Issue Type: Sub-task  (was: Improvement)

> A new approach to do adaptive execution in Spark SQL
> 
>
> Key: SPARK-23128
> URL: https://issues.apache.org/jira/browse/SPARK-23128
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Carson Wang
>Assignee: Carson Wang
>Priority: Major
> Fix For: 3.0.0
>
> Attachments: AdaptiveExecutioninBaidu.pdf
>
>
> SPARK-9850 proposed the basic idea of adaptive execution in Spark. In 
> DAGScheduler, a new API is added to support submitting a single map stage.  
> The current implementation of adaptive execution in Spark SQL supports 
> changing the reducer number at runtime. An Exchange coordinator is used to 
> determine the number of post-shuffle partitions for a stage that needs to 
> fetch shuffle data from one or multiple stages. The current implementation 
> adds ExchangeCoordinator while we are adding Exchanges. However there are 
> some limitations. First, it may cause additional shuffles that may decrease 
> the performance. We can see this from EnsureRequirements rule when it adds 
> ExchangeCoordinator.  Secondly, it is not a good idea to add 
> ExchangeCoordinators while we are adding Exchanges because we don’t have a 
> global picture of all shuffle dependencies of a post-shuffle stage. I.e. for 
> 3 tables’ join in a single stage, the same ExchangeCoordinator should be used 
> in three Exchanges but currently two separated ExchangeCoordinator will be 
> added. Thirdly, with the current framework it is not easy to implement other 
> features in adaptive execution flexibly like changing the execution plan and 
> handling skewed join at runtime.
> We'd like to introduce a new way to do adaptive execution in Spark SQL and 
> address the limitations. The idea is described at 
> [https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23128) A new approach to do adaptive execution in Spark SQL

2018-05-11 Thread Li Yuanjian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-23128:

Attachment: AdaptiveExecutioninBaidu.pdf

> A new approach to do adaptive execution in Spark SQL
> 
>
> Key: SPARK-23128
> URL: https://issues.apache.org/jira/browse/SPARK-23128
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Carson Wang
>Priority: Major
> Attachments: AdaptiveExecutioninBaidu.pdf
>
>
> SPARK-9850 proposed the basic idea of adaptive execution in Spark. In 
> DAGScheduler, a new API is added to support submitting a single map stage.  
> The current implementation of adaptive execution in Spark SQL supports 
> changing the reducer number at runtime. An Exchange coordinator is used to 
> determine the number of post-shuffle partitions for a stage that needs to 
> fetch shuffle data from one or multiple stages. The current implementation 
> adds ExchangeCoordinator while we are adding Exchanges. However there are 
> some limitations. First, it may cause additional shuffles that may decrease 
> the performance. We can see this from EnsureRequirements rule when it adds 
> ExchangeCoordinator.  Secondly, it is not a good idea to add 
> ExchangeCoordinators while we are adding Exchanges because we don’t have a 
> global picture of all shuffle dependencies of a post-shuffle stage. I.e. for 
> 3 tables’ join in a single stage, the same ExchangeCoordinator should be used 
> in three Exchanges but currently two separated ExchangeCoordinator will be 
> added. Thirdly, with the current framework it is not easy to implement other 
> features in adaptive execution flexibly like changing the execution plan and 
> handling skewed join at runtime.
> We'd like to introduce a new way to do adaptive execution in Spark SQL and 
> address the limitations. The idea is described at 
> [https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23128) A new approach to do adaptive execution in Spark SQL

2018-01-17 Thread Carson Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Carson Wang updated SPARK-23128:

Affects Version/s: (was: 2.2.1)
   2.3.0

> A new approach to do adaptive execution in Spark SQL
> 
>
> Key: SPARK-23128
> URL: https://issues.apache.org/jira/browse/SPARK-23128
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Carson Wang
>Priority: Major
>
> SPARK-9850 proposed the basic idea of adaptive execution in Spark. In 
> DAGScheduler, a new API is added to support submitting a single map stage.  
> The current implementation of adaptive execution in Spark SQL supports 
> changing the reducer number at runtime. An Exchange coordinator is used to 
> determine the number of post-shuffle partitions for a stage that needs to 
> fetch shuffle data from one or multiple stages. The current implementation 
> adds ExchangeCoordinator while we are adding Exchanges. However there are 
> some limitations. First, it may cause additional shuffles that may decrease 
> the performance. We can see this from EnsureRequirements rule when it adds 
> ExchangeCoordinator.  Secondly, it is not a good idea to add 
> ExchangeCoordinators while we are adding Exchanges because we don’t have a 
> global picture of all shuffle dependencies of a post-shuffle stage. I.e. for 
> 3 tables’ join in a single stage, the same ExchangeCoordinator should be used 
> in three Exchanges but currently two separated ExchangeCoordinator will be 
> added. Thirdly, with the current framework it is not easy to implement other 
> features in adaptive execution flexibly like changing the execution plan and 
> handling skewed join at runtime.
> We'd like to introduce a new way to do adaptive execution in Spark SQL and 
> address the limitations. The idea is described at 
> [https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23128) A new approach to do adaptive execution in Spark SQL

2018-01-17 Thread Carson Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Carson Wang updated SPARK-23128:

Description: 
SPARK-9850 proposed the basic idea of adaptive execution in Spark. In 
DAGScheduler, a new API is added to support submitting a single map stage.  The 
current implementation of adaptive execution in Spark SQL supports changing the 
reducer number at runtime. An Exchange coordinator is used to determine the 
number of post-shuffle partitions for a stage that needs to fetch shuffle data 
from one or multiple stages. The current implementation adds 
ExchangeCoordinator while we are adding Exchanges. However there are some 
limitations. First, it may cause additional shuffles that may decrease the 
performance. We can see this from EnsureRequirements rule when it adds 
ExchangeCoordinator.  Secondly, it is not a good idea to add 
ExchangeCoordinators while we are adding Exchanges because we don’t have a 
global picture of all shuffle dependencies of a post-shuffle stage. I.e. for 3 
tables’ join in a single stage, the same ExchangeCoordinator should be used in 
three Exchanges but currently two separated ExchangeCoordinator will be added. 
Thirdly, with the current framework it is not easy to implement other features 
in adaptive execution flexibly like changing the execution plan and handling 
skewed join at runtime.

We'd like to introduce a new way to do adaptive execution in Spark SQL and 
address the limitations. The idea is described at 
[https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing]

  was:
SPARK-9850 proposed the basic idea of adaptive execution in Spark. In 
DAGScheduler, a new API is added to support submitting a single map stage.  The 
current implementation of adaptive execution in Spark SQL supports changing the 
reducer number at runtime. An Exchange coordinator is used to determine the 
number of post-shuffle partitions for a stage that needs to fetch shuffle data 
from one or multiple stages. The current implementation adds 
ExchangeCoordinator while we are adding Exchanges. However there are some 
limitations. First, it may cause additional shuffles that may decrease the 
performance. We can see this from EnsureRequirements rule when it adds 
ExchangeCoordinator.  Secondly, it is not a good idea to add 
ExchangeCoordinators while we are adding Exchanges because we don’t have a 
global picture of all shuffle dependencies of a post-shuffle stage. I.e. for 3 
tables’ join in a single stage, the same ExchangeCoordinator should be used in 
three Exchanges but currently two separated ExchangeCoordinator will be added. 
Thirdly, with the current framework it is not easy to implement other features 
in adaptive execution flexibly like changing the execution plan and handling 
skewed join at runtime.

We'd like to introduce QueryStage and a new way to do adaptive execution in 
Spark SQL and address the limitations. The idea is described at 
https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing


> A new approach to do adaptive execution in Spark SQL
> 
>
> Key: SPARK-23128
> URL: https://issues.apache.org/jira/browse/SPARK-23128
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Carson Wang
>Priority: Major
>
> SPARK-9850 proposed the basic idea of adaptive execution in Spark. In 
> DAGScheduler, a new API is added to support submitting a single map stage.  
> The current implementation of adaptive execution in Spark SQL supports 
> changing the reducer number at runtime. An Exchange coordinator is used to 
> determine the number of post-shuffle partitions for a stage that needs to 
> fetch shuffle data from one or multiple stages. The current implementation 
> adds ExchangeCoordinator while we are adding Exchanges. However there are 
> some limitations. First, it may cause additional shuffles that may decrease 
> the performance. We can see this from EnsureRequirements rule when it adds 
> ExchangeCoordinator.  Secondly, it is not a good idea to add 
> ExchangeCoordinators while we are adding Exchanges because we don’t have a 
> global picture of all shuffle dependencies of a post-shuffle stage. I.e. for 
> 3 tables’ join in a single stage, the same ExchangeCoordinator should be used 
> in three Exchanges but currently two separated ExchangeCoordinator will be 
> added. Thirdly, with the current framework it is not easy to implement other 
> features in adaptive execution flexibly like changing the execution plan and 
> handling skewed join at runtime.
> We'd like to introduce a new way to do adaptive execution in Spark SQL and 
> address the limitations. The idea is described at 
> 

[jira] [Updated] (SPARK-23128) A new approach to do adaptive execution in Spark SQL

2018-01-17 Thread Carson Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Carson Wang updated SPARK-23128:

Summary: A new approach to do adaptive execution in Spark SQL  (was: 
Introduce QueryStage to improve adaptive execution in Spark SQL)

> A new approach to do adaptive execution in Spark SQL
> 
>
> Key: SPARK-23128
> URL: https://issues.apache.org/jira/browse/SPARK-23128
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Carson Wang
>Priority: Major
>
> SPARK-9850 proposed the basic idea of adaptive execution in Spark. In 
> DAGScheduler, a new API is added to support submitting a single map stage.  
> The current implementation of adaptive execution in Spark SQL supports 
> changing the reducer number at runtime. An Exchange coordinator is used to 
> determine the number of post-shuffle partitions for a stage that needs to 
> fetch shuffle data from one or multiple stages. The current implementation 
> adds ExchangeCoordinator while we are adding Exchanges. However there are 
> some limitations. First, it may cause additional shuffles that may decrease 
> the performance. We can see this from EnsureRequirements rule when it adds 
> ExchangeCoordinator.  Secondly, it is not a good idea to add 
> ExchangeCoordinators while we are adding Exchanges because we don’t have a 
> global picture of all shuffle dependencies of a post-shuffle stage. I.e. for 
> 3 tables’ join in a single stage, the same ExchangeCoordinator should be used 
> in three Exchanges but currently two separated ExchangeCoordinator will be 
> added. Thirdly, with the current framework it is not easy to implement other 
> features in adaptive execution flexibly like changing the execution plan and 
> handling skewed join at runtime.
> We'd like to introduce QueryStage and a new way to do adaptive execution in 
> Spark SQL and address the limitations. The idea is described at 
> https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org