[jira] [Commented] (ARROW-9420) [Rust][DataFusion] Add repartition/shuffle plan
[ https://issues.apache.org/jira/browse/ARROW-9420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17157350#comment-17157350 ] Andy Grove commented on ARROW-9420: --- I have created a new Jira to replace this one, since the changes I am proposing go a bit beyond simple changing how we manage threads. https://issues.apache.org/jira/browse/ARROW-9464 > [Rust][DataFusion] Add repartition/shuffle plan > --- > > Key: ARROW-9420 > URL: https://issues.apache.org/jira/browse/ARROW-9420 > Project: Apache Arrow > Issue Type: Improvement > Components: Rust - DataFusion >Reporter: Jorge >Assignee: Jorge >Priority: Major > Labels: pull-request-available > Time Spent: 40m > Remaining Estimate: 0h > > Some operations (group by, join, over(window.partition_by)) greatly benefit > from hash partitioning. > This is a proposal to add hash partitioning (based on a expression) to this > library, so that optimizers can be written to optimize the plan based on the > required hashing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ARROW-9420) [Rust][DataFusion] Add repartition/shuffle plan
[ https://issues.apache.org/jira/browse/ARROW-9420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17157211#comment-17157211 ] Jorge commented on ARROW-9420: -- Thanks a lot for the clarification. Makes a lot of sense. I am a bit unsure how we are exactly doing all of this, since all the parallelism on DataFusion is thread-based, while in a distributed environment we work at the inter-process level through some communication medium. IMO for Ballista to inter-operate with DataFusion, the API should not be based on SQL, but a smaller unit like intra-partition operations. Also, in that case, DataFusion should not have to worry about cross-partition operations as they are normally planned differently in a distributed context. Looking forward for the Jira issue - We currently use Vec as an iterator, which defeats the purpose of an iterator since we need to fit it all in memory anyway. In theory Record batches can run concurrently up to an exchange, and IMO one way to do this is to replace our `Vec` by an iterator of asyncs that are spawn in parallel. > [Rust][DataFusion] Add repartition/shuffle plan > --- > > Key: ARROW-9420 > URL: https://issues.apache.org/jira/browse/ARROW-9420 > Project: Apache Arrow > Issue Type: Improvement > Components: Rust - DataFusion >Reporter: Jorge >Assignee: Jorge >Priority: Major > Labels: pull-request-available > Time Spent: 40m > Remaining Estimate: 0h > > Some operations (group by, join, over(window.partition_by)) greatly benefit > from hash partitioning. > This is a proposal to add hash partitioning (based on a expression) to this > library, so that optimizers can be written to optimize the plan based on the > required hashing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ARROW-9420) [Rust][DataFusion] Add repartition/shuffle plan
[ https://issues.apache.org/jira/browse/ARROW-9420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17156757#comment-17156757 ] Andy Grove commented on ARROW-9420: --- My ultimate goal is for all query evaluation to be in DataFusion, with Ballista just providing necessary additional optimizer rules to run the queries in a distributed manner. DataFusion isn't yet mature enough to support this use case so currently Ballista fully re-uses the DataFusion logical plan and SQL planner, but implements its own physical + execution plans, forked from DataFusion. > [Rust][DataFusion] Add repartition/shuffle plan > --- > > Key: ARROW-9420 > URL: https://issues.apache.org/jira/browse/ARROW-9420 > Project: Apache Arrow > Issue Type: Improvement > Components: Rust - DataFusion >Reporter: Jorge >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Some operations (group by, join, over(window.partition_by)) greatly benefit > from hash partitioning. > This is a proposal to add hash partitioning (based on a expression) to this > library, so that optimizers can be written to optimize the plan based on the > required hashing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ARROW-9420) [Rust][DataFusion] Add repartition/shuffle plan
[ https://issues.apache.org/jira/browse/ARROW-9420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17156756#comment-17156756 ] Andy Grove commented on ARROW-9420: --- DataFusion is an in-memory single process query engine. Ballista is a distributed platform. Ballista uses Arrow and DataFusion. Ballista is also where I try out many new ideas and contribute the ones that work out back to DataFusion. For example, I've been busy for a few months re-working the physical query plan to use async and I am planning on creating a Jira later this week to discuss making these changes in DataFusion. > [Rust][DataFusion] Add repartition/shuffle plan > --- > > Key: ARROW-9420 > URL: https://issues.apache.org/jira/browse/ARROW-9420 > Project: Apache Arrow > Issue Type: Improvement > Components: Rust - DataFusion >Reporter: Jorge >Priority: Major > > Some operations (group by, join, over(window.partition_by)) greatly benefit > from hash partitioning. > This is a proposal to add hash partitioning (based on a expression) to this > library, so that optimizers can be written to optimize the plan based on the > required hashing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ARROW-9420) [Rust][DataFusion] Add repartition/shuffle plan
[ https://issues.apache.org/jira/browse/ARROW-9420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17156753#comment-17156753 ] Jorge commented on ARROW-9420: -- Yes, that was also my thinking. What is the relationship of Ballista with datafusion? It seems that they solve the same problem. Why not just stick to one of them? > [Rust][DataFusion] Add repartition/shuffle plan > --- > > Key: ARROW-9420 > URL: https://issues.apache.org/jira/browse/ARROW-9420 > Project: Apache Arrow > Issue Type: Improvement > Components: Rust - DataFusion >Reporter: Jorge >Priority: Major > > Some operations (group by, join, over(window.partition_by)) greatly benefit > from hash partitioning. > This is a proposal to add hash partitioning (based on a expression) to this > library, so that optimizers can be written to optimize the plan based on the > required hashing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ARROW-9420) [Rust][DataFusion] Add repartition/shuffle plan
[ https://issues.apache.org/jira/browse/ARROW-9420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17156731#comment-17156731 ] Andy Grove commented on ARROW-9420: --- I don't think we would need any integration with SQL or the logical plan. Only the physical plan should be concerned with partitioning. IMO. The way I would see this working is that each operator in the physical plan knows what its output partitioning is, and also knows what the required input partitioning is for its children. Then we can have an optimizer rule that inserts shuffles where needed. This is the model used in Apache Spark and also in Ballista [1]. https://github.com/ballista-compute/ballista/blob/main/rust/ballista/src/distributed/scheduler.rs#L401-L441[1] > [Rust][DataFusion] Add repartition/shuffle plan > --- > > Key: ARROW-9420 > URL: https://issues.apache.org/jira/browse/ARROW-9420 > Project: Apache Arrow > Issue Type: Improvement > Components: Rust - DataFusion >Reporter: Jorge >Priority: Major > > Some operations (group by, join, over(window.partition_by)) greatly benefit > from hash partitioning. > This is a proposal to add hash partitioning (based on a expression) to this > library, so that optimizers can be written to optimize the plan based on the > required hashing. -- This message was sent by Atlassian Jira (v8.3.4#803005)