[jira] [Reopened] (ARROW-9464) [Rust] [DataFusion] Physical plan refactor to support optimization rules and more efficient use of threads

2020-10-03 Thread Andy Grove (Jira)


 [ 
https://issues.apache.org/jira/browse/ARROW-9464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Grove reopened ARROW-9464:
---

There are subtasks that are not complete yet. Reopening this for 3.0.0

> [Rust] [DataFusion] Physical plan refactor to support optimization rules and 
> more efficient use of threads
> --
>
> Key: ARROW-9464
> URL: https://issues.apache.org/jira/browse/ARROW-9464
> Project: Apache Arrow
>  Issue Type: Improvement
>  Components: Rust, Rust - DataFusion
>Reporter: Andy Grove
>Assignee: Andy Grove
>Priority: Major
>  Labels: pull-request-available
> Fix For: 3.0.0
>
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> I would like to propose a refactor of the physical/execution planning based 
> on the experience I have had in implementing distributed execution in 
> Ballista.
> This will likely need subtasks but here is an overview of the changes I am 
> proposing.
> h3. *Introduce physical plan optimization rule to insert "shuffle" operators*
> We should extend the ExecutionPlan trait so that each operator can specify 
> its input and output partitioning needs, and then have an optimization rule 
> that can insert any repartitioning or reordering steps required.
> For example, these are the methods to be added to ExecutionPlan. This design 
> is based on Apache Spark.
>  
> {code:java}
> /// Specifies how data is partitioned across different nodes in the cluster
> fn output_partitioning() -> Partitioning {
> Partitioning::UnknownPartitioning(0)
> }
> /// Specifies the data distribution requirements of all the children for this 
> operator
> fn required_child_distribution() -> Distribution {
> Distribution::UnspecifiedDistribution
> }
> /// Specifies how data is ordered in each partition
> fn output_ordering() -> Option> {
> None
> }
> /// Specifies the data distribution requirements of all the children for this 
> operator
> fn required_child_ordering() -> Option>> {
> None
> }
>  {code}
> A good example of applying this rule would be in the case of hash aggregates 
> where we perform a partial aggregate in parallel across partitions and then 
> coalesce the results and apply a final hash aggregate.
> Another example would be a SortMergeExec specifying the sort order required 
> for its children.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (ARROW-9464) [Rust] [DataFusion] Physical plan refactor to support optimization rules and more efficient use of threads

2020-08-23 Thread Andy Grove (Jira)


 [ 
https://issues.apache.org/jira/browse/ARROW-9464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Grove reopened ARROW-9464:
---

> [Rust] [DataFusion] Physical plan refactor to support optimization rules and 
> more efficient use of threads
> --
>
> Key: ARROW-9464
> URL: https://issues.apache.org/jira/browse/ARROW-9464
> Project: Apache Arrow
>  Issue Type: Improvement
>  Components: Rust, Rust - DataFusion
>Reporter: Andy Grove
>Assignee: Andy Grove
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> I would like to propose a refactor of the physical/execution planning based 
> on the experience I have had in implementing distributed execution in 
> Ballista.
> This will likely need subtasks but here is an overview of the changes I am 
> proposing.
> h3. *Introduce enum to represent physical plan.*
> By wrapping the execution plan structs in an enum, we make it possible to 
> build a tree representing the physical plan just like we do with the logical 
> plan. This makes it easy to print physical plans and also to apply 
> transformations to it.
> {code:java}
>  pub enum PhysicalPlan {
> /// Projection.
> Projection(Arc),
> /// Filter a.k.a predicate.
> Filter(Arc),
> /// Hash aggregate
> HashAggregate(Arc),
> /// Performs a hash join of two child relations by first shuffling the 
> data using the join keys.
> ShuffledHashJoin(ShuffledHashJoinExec),
> /// Performs a shuffle that will result in the desired partitioning.
> ShuffleExchange(Arc),
> /// Reads results from a ShuffleExchange
> ShuffleReader(Arc),
> /// Scans a partitioned data source
> ParquetScan(Arc),
> /// Scans an in-memory table
> InMemoryTableScan(Arc),
> }{code}
> h3. *Introduce physical plan optimization rule to insert "shuffle" operators*
> We should extend the ExecutionPlan trait so that each operator can specify 
> its input and output partitioning needs, and then have an optimization rule 
> that can insert any repartioning or reordering steps required.
> For example, these are the methods to be added to ExecutionPlan. This design 
> is based on Apache Spark.
>  
> {code:java}
> /// Specifies how data is partitioned across different nodes in the cluster
> fn output_partitioning() -> Partitioning {
> Partitioning::UnknownPartitioning(0)
> }
> /// Specifies the data distribution requirements of all the children for this 
> operator
> fn required_child_distribution() -> Distribution {
> Distribution::UnspecifiedDistribution
> }
> /// Specifies how data is ordered in each partition
> fn output_ordering() -> Option> {
> None
> }
> /// Specifies the data distribution requirements of all the children for this 
> operator
> fn required_child_ordering() -> Option>> {
> None
> }
>  {code}
> A good example of applying this rule would be in the case of hash aggregates 
> where we perform a partial aggregate in parallel across partitions and then 
> coalesce the results and apply a final hash aggregate.
> Another example would be a SortMergeExec specifying the sort order required 
> for its children.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)