[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

2020-10-29 Thread GitBox


jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r514788126



##
File path: rust/datafusion/src/physical_plan/merge.rs
##
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
 self.input.execute(0).await
 }
 _ => {
-let tasks = (0..input_partitions).map(|part_i| {
+let (sender, receiver) = 
mpsc::unbounded::>();

Review comment:
   Super! I will close this one in favor of yours.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

2020-10-28 Thread GitBox


jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r513699488



##
File path: rust/datafusion/src/physical_plan/merge.rs
##
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
 self.input.execute(0).await
 }
 _ => {
-let tasks = (0..input_partitions).map(|part_i| {
+let (sender, receiver) = 
mpsc::unbounded::>();

Review comment:
   Cool! 
   
   This is what I use:
   
   ```bash
   git checkout master && \
   cargo bench --bench aggregate_query_sql && \
   git checkout merge && \
   cargo bench --bench aggregate_query_sql
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

2020-10-28 Thread GitBox


jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r513696355



##
File path: rust/datafusion/src/physical_plan/merge.rs
##
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
 self.input.execute(0).await
 }
 _ => {
-let tasks = (0..input_partitions).map(|part_i| {
+let (sender, receiver) = 
mpsc::unbounded::>();

Review comment:
   you are in the right track IMO. Good thinking going for the Hash first.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

2020-10-28 Thread GitBox


jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r513654504



##
File path: rust/datafusion/src/physical_plan/merge.rs
##
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
 self.input.execute(0).await
 }
 _ => {
-let tasks = (0..input_partitions).map(|part_i| {
+let (sender, receiver) = 
mpsc::unbounded::>();

Review comment:
   At this point I am ashamed of the immoral amount of time I spent on this 

   
   I am getting convinced that I should have kept the other PR with the 
iterator. IMO that was easy to reason about (and there was already a 50% 
performance improvement xD





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

2020-10-26 Thread GitBox


jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r512384366



##
File path: rust/datafusion/src/physical_plan/merge.rs
##
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
 self.input.execute(0).await
 }
 _ => {
-let tasks = (0..input_partitions).map(|part_i| {
+let (sender, receiver) = 
mpsc::unbounded::>();

Review comment:
   I it that this is by design: there is no guarantee that a task spawn by 
Tokio finishes unless we `.join()` it. A `tokio::spawn` is a future like any 
other future: we need to await for it.
   
   At least the problem is well defined: we have a vector (over `parts`) of 
futures (`spawn`) whose result is a stream (of `record batches`) and we would 
like to convert this into a stream of record batches. 
   
   The solution IMO is to have a new adapter for this:
   
   ```rust
   // vec: one entry per part of the partition
   // future: the `tokio::spawn`
   // stream: the stream of records
   type Streams = Vec>>>;
   
   struct Adapter {}
   
   impl Adapter {
   pub fn new(it: Streams);
   }
   
   impl Stream for Adapter {
   item = Result
   
   fn poll_next(
   mut self: Pin< Self>,
   cx:  Context<'_>,
   ) -> Poll> {
// poll any of the tasks, if it is done, store the stream in `self` 
and start pulling 
// from the stream together with the other tasks
   }
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

2020-10-26 Thread GitBox


jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r512384366



##
File path: rust/datafusion/src/physical_plan/merge.rs
##
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
 self.input.execute(0).await
 }
 _ => {
-let tasks = (0..input_partitions).map(|part_i| {
+let (sender, receiver) = 
mpsc::unbounded::>();

Review comment:
   I it that this is by design: there is no guarantee that a task spawn by 
Tokio finishes unless we `.join()` it. A `tokio::spawn` is a future like any 
other future: we need to await for it.
   
   At least the problem is well defined: we have a vector (over `parts`) of 
futures (`spawn`) whose result is a stream (of `record batches`) and we would 
like to convert this into a stream of record batches. 
   
   The solution IMO is to have a new adapter for this:
   
   ```rust
   type Streams = Vec>>>;
   
   struct Adapter {}
   
   impl Adapter {
   pub fn new(it: Streams);
   }
   
   impl Stream for Adapter {
   item = Result
   
   fn poll_next(
   mut self: Pin< Self>,
   cx:  Context<'_>,
   ) -> Poll> {
// poll any of the tasks, if it is done, store the stream in `self` 
and start pulling 
// from the stream together with the other tasks
   }
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

2020-10-22 Thread GitBox


jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r510567189



##
File path: rust/datafusion/src/physical_plan/merge.rs
##
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
 self.input.execute(0).await
 }
 _ => {
-let tasks = (0..input_partitions).map(|part_i| {
+let (sender, receiver) = 
mpsc::unbounded::>();

Review comment:
   Thanks @alamb. So, 
[this](https://github.com/jorgecarleitao/arrow/pull/18) fails with no 
partitions collected at the end.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

2020-10-22 Thread GitBox


jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r510227802



##
File path: rust/datafusion/src/physical_plan/merge.rs
##
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
 self.input.execute(0).await
 }
 _ => {
-let tasks = (0..input_partitions).map(|part_i| {
+let (sender, receiver) = 
mpsc::unbounded::>();

Review comment:
   I thought that too!
   
   until this morning, where I opened this PR and many tests failed. :/
   
   The issue is not that we want the errors, is that we want to ensure the the 
threads finish. If we remove the `join_all`, we end up in the situation 
described here: https://stackoverflow.com/q/38957741/931303 (I think, thanks 
for bearing with me also ^_^)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

2020-10-22 Thread GitBox


jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r510134213



##
File path: rust/datafusion/src/physical_plan/merge.rs
##
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
 self.input.execute(0).await
 }
 _ => {
-let tasks = (0..input_partitions).map(|part_i| {
+let (sender, receiver) = 
mpsc::unbounded::>();

Review comment:
   Good point.
   
   I actually started with a `bounded(1)`, but this (and other values) won't 
work because we need to `join_all` threads. because there is no consumer to 
retrieve the items from the `receiver`, we are locked as the threads cannot be 
joined, and we wait indefinitely for the `join_all`.
   
   An alternative is to not `join_all`, but then we risk losing results if the 
main thread finishes first.
   
   I.e. if we bound the channel, we cannot `join_all` and thus we may lose 
threads. If we join all, the channel needs to be unbounded so that we can build 
the stream.
   
   IMO neither is good, as in both cases we are essentially waiting for all 
threads to finish before returning the stream, which I understand is not what 
we want.
   
   (I was hoping that some of you would know the solution for this )





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

2020-10-22 Thread GitBox


jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r510134213



##
File path: rust/datafusion/src/physical_plan/merge.rs
##
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
 self.input.execute(0).await
 }
 _ => {
-let tasks = (0..input_partitions).map(|part_i| {
+let (sender, receiver) = 
mpsc::unbounded::>();

Review comment:
   Good point.
   
   I actually started with a `bounded(1)`, but this (and other values) won't 
work because we need to `join_all` threads. because there is no consumer to 
retrieve the items from the `receiver`, we are locked as the threads cannot be 
joined, and we wait indefinitely for the `join_all`.
   
   An alternative is to not `join_all`, but then we risk losing results if the 
main thread finishes first.
   
   I.e. if we bound the channel, we cannot `join_all` and thus we may lose 
threads. If we join all, the channel needs to be unbounded so that we can build 
the stream.
   
   IMO neither is good, as in both cases we are essentially waiting for all 
threads to finish before returning the stream, which I understand is not what 
we want.
   
I was hoping that some of you would know the solution for this 






This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

2020-10-22 Thread GitBox


jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r510134213



##
File path: rust/datafusion/src/physical_plan/merge.rs
##
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
 self.input.execute(0).await
 }
 _ => {
-let tasks = (0..input_partitions).map(|part_i| {
+let (sender, receiver) = 
mpsc::unbounded::>();

Review comment:
   Good point.
   
   I actually started with a `bounded(1)`, but this (and others) won't work 
because we need to `join_all` threads. because there is no consumer to retrieve 
the items from the `receiver`, we are locked as the threads cannot be joined, 
and we wait indefinitely for the `join_all`.
   
   An alternative is to not `join_all`, but then we risk losing results if the 
main thread finishes first.
   
   I.e. if we bound the channel, we cannot `join_all` and thus we may lose 
threads. If we join all, the channel needs to be unbounded so that we can build 
the stream.
   
   IMO neither is good, as in both cases we are essentially waiting for all 
threads to finish before returning the stream, which I understand is not what 
we want.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

2020-10-22 Thread GitBox


jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r510134213



##
File path: rust/datafusion/src/physical_plan/merge.rs
##
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
 self.input.execute(0).await
 }
 _ => {
-let tasks = (0..input_partitions).map(|part_i| {
+let (sender, receiver) = 
mpsc::unbounded::>();

Review comment:
   Good point.
   
   I actually started with a `bounded(1)`, but this (and other values) won't 
work because we need to `join_all` threads. because there is no consumer to 
retrieve the items from the `receiver`, we are locked as the threads cannot be 
joined, and we wait indefinitely for the `join_all`.
   
   An alternative is to not `join_all`, but then we risk losing results if the 
main thread finishes first.
   
   I.e. if we bound the channel, we cannot `join_all` and thus we may lose 
threads. If we join all, the channel needs to be unbounded so that we can build 
the stream.
   
   IMO neither is good, as in both cases we are essentially waiting for all 
threads to finish before returning the stream, which I understand is not what 
we want.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

2020-10-22 Thread GitBox


jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r509907817



##
File path: rust/datafusion/src/physical_plan/merge.rs
##
@@ -103,37 +105,53 @@ impl ExecutionPlan for MergeExec {
 self.input.execute(0).await
 }
 _ => {
-let tasks = (0..input_partitions).map(|part_i| {
+// todo: buffer size should be configurable or dependent of 
metrics
+let (sender, receiver) = 
mpsc::channel::>(1);
+
+// spawn independent tasks whose resulting streams (of batches)
+// are sent to the channel for consumption.
+(0..input_partitions).for_each(|part_i| {
 let input = self.input.clone();
+let mut sender = sender.clone();
 tokio::spawn(async move {

Review comment:
   we need to join the handles of this one or the main thread may finish 
before the spawn task does.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org