[jira] [Commented] (ARROW-9420) [Rust][DataFusion] Add repartition/shuffle plan

2020-07-14 Thread Andy Grove (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-9420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17157350#comment-17157350
 ] 

Andy Grove commented on ARROW-9420:
---

I have created a new Jira to replace this one, since the changes I am proposing 
go a bit beyond simple changing how we manage threads.

 

https://issues.apache.org/jira/browse/ARROW-9464

> [Rust][DataFusion] Add repartition/shuffle plan
> ---
>
> Key: ARROW-9420
> URL: https://issues.apache.org/jira/browse/ARROW-9420
> Project: Apache Arrow
>  Issue Type: Improvement
>  Components: Rust - DataFusion
>Reporter: Jorge
>Assignee: Jorge
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Some operations (group by, join, over(window.partition_by)) greatly benefit 
> from hash partitioning.
> This is a proposal to add hash partitioning (based on a expression) to this 
> library, so that optimizers can be written to optimize the plan based on the 
> required hashing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-9420) [Rust][DataFusion] Add repartition/shuffle plan

2020-07-14 Thread Jorge (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-9420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17157211#comment-17157211
 ] 

Jorge commented on ARROW-9420:
--

Thanks a lot for the clarification. Makes a lot of sense.

I am a bit unsure how we are exactly doing all of this, since all the 
parallelism on DataFusion is thread-based, while in a distributed environment 
we work at the inter-process level through some communication medium.

IMO for Ballista to inter-operate with DataFusion, the API should not be based 
on SQL, but a smaller unit like intra-partition operations.

Also, in that case, DataFusion should not have to worry about cross-partition 
operations as they are normally planned differently in a distributed context.

Looking forward for the Jira issue - We currently use Vec as an 
iterator, which defeats the purpose of an iterator since we need to fit it all 
in memory anyway. In theory Record batches can run concurrently up to an 
exchange, and IMO one way to do this is to replace our `Vec` by an 
iterator of asyncs that are spawn in parallel.

> [Rust][DataFusion] Add repartition/shuffle plan
> ---
>
> Key: ARROW-9420
> URL: https://issues.apache.org/jira/browse/ARROW-9420
> Project: Apache Arrow
>  Issue Type: Improvement
>  Components: Rust - DataFusion
>Reporter: Jorge
>Assignee: Jorge
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Some operations (group by, join, over(window.partition_by)) greatly benefit 
> from hash partitioning.
> This is a proposal to add hash partitioning (based on a expression) to this 
> library, so that optimizers can be written to optimize the plan based on the 
> required hashing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-9420) [Rust][DataFusion] Add repartition/shuffle plan

2020-07-13 Thread Andy Grove (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-9420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17156757#comment-17156757
 ] 

Andy Grove commented on ARROW-9420:
---

My ultimate goal is for all query evaluation to be in DataFusion, with Ballista 
just providing necessary additional optimizer rules to run the queries in a 
distributed manner. DataFusion isn't yet mature enough to support this use case 
so currently Ballista fully re-uses the DataFusion logical plan and SQL 
planner, but implements its own physical + execution plans, forked from 
DataFusion.

> [Rust][DataFusion] Add repartition/shuffle plan
> ---
>
> Key: ARROW-9420
> URL: https://issues.apache.org/jira/browse/ARROW-9420
> Project: Apache Arrow
>  Issue Type: Improvement
>  Components: Rust - DataFusion
>Reporter: Jorge
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Some operations (group by, join, over(window.partition_by)) greatly benefit 
> from hash partitioning.
> This is a proposal to add hash partitioning (based on a expression) to this 
> library, so that optimizers can be written to optimize the plan based on the 
> required hashing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-9420) [Rust][DataFusion] Add repartition/shuffle plan

2020-07-13 Thread Andy Grove (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-9420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17156756#comment-17156756
 ] 

Andy Grove commented on ARROW-9420:
---

DataFusion is an in-memory single process query engine. Ballista is a 
distributed platform. Ballista uses Arrow and DataFusion.

Ballista is also where I try out many new ideas and contribute the ones that 
work out back to DataFusion. For example, I've been busy for a few months 
re-working the physical query plan to use async and I am planning on creating a 
Jira later this week to discuss making these changes in DataFusion.

> [Rust][DataFusion] Add repartition/shuffle plan
> ---
>
> Key: ARROW-9420
> URL: https://issues.apache.org/jira/browse/ARROW-9420
> Project: Apache Arrow
>  Issue Type: Improvement
>  Components: Rust - DataFusion
>Reporter: Jorge
>Priority: Major
>
> Some operations (group by, join, over(window.partition_by)) greatly benefit 
> from hash partitioning.
> This is a proposal to add hash partitioning (based on a expression) to this 
> library, so that optimizers can be written to optimize the plan based on the 
> required hashing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-9420) [Rust][DataFusion] Add repartition/shuffle plan

2020-07-13 Thread Jorge (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-9420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17156753#comment-17156753
 ] 

Jorge commented on ARROW-9420:
--

Yes, that was also my thinking. 

What is the relationship of Ballista with datafusion? It seems that they solve 
the same problem. Why not just stick to one of them?

> [Rust][DataFusion] Add repartition/shuffle plan
> ---
>
> Key: ARROW-9420
> URL: https://issues.apache.org/jira/browse/ARROW-9420
> Project: Apache Arrow
>  Issue Type: Improvement
>  Components: Rust - DataFusion
>Reporter: Jorge
>Priority: Major
>
> Some operations (group by, join, over(window.partition_by)) greatly benefit 
> from hash partitioning.
> This is a proposal to add hash partitioning (based on a expression) to this 
> library, so that optimizers can be written to optimize the plan based on the 
> required hashing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-9420) [Rust][DataFusion] Add repartition/shuffle plan

2020-07-13 Thread Andy Grove (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-9420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17156731#comment-17156731
 ] 

Andy Grove commented on ARROW-9420:
---

I don't think we would need any integration with SQL or the logical plan. Only 
the physical plan should be concerned with partitioning. IMO.

The way I would see this working is that each operator in the physical plan 
knows what its output partitioning is, and also knows what the required input 
partitioning is for its children. Then we can have an optimizer rule that 
inserts shuffles where needed. This is the model used in Apache Spark and also 
in Ballista [1].

 
https://github.com/ballista-compute/ballista/blob/main/rust/ballista/src/distributed/scheduler.rs#L401-L441[1]

> [Rust][DataFusion] Add repartition/shuffle plan
> ---
>
> Key: ARROW-9420
> URL: https://issues.apache.org/jira/browse/ARROW-9420
> Project: Apache Arrow
>  Issue Type: Improvement
>  Components: Rust - DataFusion
>Reporter: Jorge
>Priority: Major
>
> Some operations (group by, join, over(window.partition_by)) greatly benefit 
> from hash partitioning.
> This is a proposal to add hash partitioning (based on a expression) to this 
> library, so that optimizers can be written to optimize the plan based on the 
> required hashing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)