Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-18 Thread via GitHub


zhuqi-lucas commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2049125883


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -662,53 +665,152 @@ impl ExternalSorter {
 let elapsed_compute = metrics.elapsed_compute().clone();
 let _timer = elapsed_compute.timer();
 
-// Please pay attention that any operation inside of 
`in_mem_sort_stream` will
-// not perform any memory reservation. This is for avoiding the need 
of handling
-// reservation failure and spilling in the middle of the sort/merge. 
The memory
-// space for batches produced by the resulting stream will be reserved 
by the
-// consumer of the stream.
+// Note, in theory in memory batches should have limited size, but 
some testing
+// cases testing the memory limit use `sort_in_place_threshold_bytes` 
to, so here we
+// set a larger limit to avoid testing failure.
+if self.expr.len() <= 2
+&& self.reservation.size() < 1000 * 
self.sort_in_place_threshold_bytes
+{
+let interleave_indices = self.build_sorted_indices(
+self.in_mem_batches.as_slice(),
+Arc::clone(&self.expr),
+)?;
 
-if self.in_mem_batches.len() == 1 {
-let batch = self.in_mem_batches.swap_remove(0);
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
-}
+let batches: Vec<&RecordBatch> = 
self.in_mem_batches.iter().collect();
+let sorted_batch = interleave_record_batch(&batches, 
&interleave_indices)?;
 
-// If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
 self.in_mem_batches.clear();
 self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
+.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
 .map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+
+metrics.record_output(sorted_batch.num_rows());
+
+Ok(Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(async { Ok(sorted_batch) }),
+)) as SendableRecordBatchStream)
+} else {
+// Please pay attention that any operation inside of 
`in_mem_sort_stream` will
+// not perform any memory reservation. This is for avoiding the 
need of handling
+// reservation failure and spilling in the middle of the 
sort/merge. The memory
+// space for batches produced by the resulting stream will be 
reserved by the
+// consumer of the stream.
+
+if self.in_mem_batches.len() == 1 {
+let batch = self.in_mem_batches.swap_remove(0);
+let reservation = self.reservation.take();
+return self.sort_batch_stream(batch, metrics, reservation);
+}
+
+// If less than sort_in_place_threshold_bytes, concatenate and 
sort in place
+if self.reservation.size() < self.sort_in_place_threshold_bytes {
+// Concatenate memory batches together and sort

Review Comment:
   Thank you for the feedback @Dandandan, 
   
   Now the logic is:
   
   1. self.expr.len() <= 2, we merge all the memory batch into one batch using 
interleave , and don't send to merge streaming
   2. Other cases keep original cases.
   
   
   Because, i can't see performance gain for interleave after the sort column  
>2 from the benchmark result, so i don't change the original concatenate here.
   
   
   And more challenges now are the testing fails and memory model. Because we 
return one single batch for the above case1, so it will not have partial merge 
for many cases, only the final merge, so the memory model will fail for some 
cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-17 Thread via GitHub


zhuqi-lucas commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2049125883


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -662,53 +665,152 @@ impl ExternalSorter {
 let elapsed_compute = metrics.elapsed_compute().clone();
 let _timer = elapsed_compute.timer();
 
-// Please pay attention that any operation inside of 
`in_mem_sort_stream` will
-// not perform any memory reservation. This is for avoiding the need 
of handling
-// reservation failure and spilling in the middle of the sort/merge. 
The memory
-// space for batches produced by the resulting stream will be reserved 
by the
-// consumer of the stream.
+// Note, in theory in memory batches should have limited size, but 
some testing
+// cases testing the memory limit use `sort_in_place_threshold_bytes` 
to, so here we
+// set a larger limit to avoid testing failure.
+if self.expr.len() <= 2
+&& self.reservation.size() < 1000 * 
self.sort_in_place_threshold_bytes
+{
+let interleave_indices = self.build_sorted_indices(
+self.in_mem_batches.as_slice(),
+Arc::clone(&self.expr),
+)?;
 
-if self.in_mem_batches.len() == 1 {
-let batch = self.in_mem_batches.swap_remove(0);
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
-}
+let batches: Vec<&RecordBatch> = 
self.in_mem_batches.iter().collect();
+let sorted_batch = interleave_record_batch(&batches, 
&interleave_indices)?;
 
-// If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
 self.in_mem_batches.clear();
 self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
+.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
 .map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+
+metrics.record_output(sorted_batch.num_rows());
+
+Ok(Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(async { Ok(sorted_batch) }),
+)) as SendableRecordBatchStream)
+} else {
+// Please pay attention that any operation inside of 
`in_mem_sort_stream` will
+// not perform any memory reservation. This is for avoiding the 
need of handling
+// reservation failure and spilling in the middle of the 
sort/merge. The memory
+// space for batches produced by the resulting stream will be 
reserved by the
+// consumer of the stream.
+
+if self.in_mem_batches.len() == 1 {
+let batch = self.in_mem_batches.swap_remove(0);
+let reservation = self.reservation.take();
+return self.sort_batch_stream(batch, metrics, reservation);
+}
+
+// If less than sort_in_place_threshold_bytes, concatenate and 
sort in place
+if self.reservation.size() < self.sort_in_place_threshold_bytes {
+// Concatenate memory batches together and sort

Review Comment:
   Thank you for the feedback @Dandandan, 
   
   Now the logic is:
   
   1. self.expr.len() <= 2, we merge all the memory batch into one batch using 
interleave , and don't send to merge streaming
   2. Other cases keep original cases.
   
   
   Because, i can't see performance gain for interleave after the sort column  
>2 from the benchmark result, so i don't change the original concatenate here.
   
   
   And more challenges now are the testing fails and memory model. Because we 
return one single batch for the above case1, so it will not have partial merge 
for many cases, only the final merge, so the memory model will fail for some 
testing cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-17 Thread via GitHub


zhuqi-lucas commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2049125883


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -662,53 +665,152 @@ impl ExternalSorter {
 let elapsed_compute = metrics.elapsed_compute().clone();
 let _timer = elapsed_compute.timer();
 
-// Please pay attention that any operation inside of 
`in_mem_sort_stream` will
-// not perform any memory reservation. This is for avoiding the need 
of handling
-// reservation failure and spilling in the middle of the sort/merge. 
The memory
-// space for batches produced by the resulting stream will be reserved 
by the
-// consumer of the stream.
+// Note, in theory in memory batches should have limited size, but 
some testing
+// cases testing the memory limit use `sort_in_place_threshold_bytes` 
to, so here we
+// set a larger limit to avoid testing failure.
+if self.expr.len() <= 2
+&& self.reservation.size() < 1000 * 
self.sort_in_place_threshold_bytes
+{
+let interleave_indices = self.build_sorted_indices(
+self.in_mem_batches.as_slice(),
+Arc::clone(&self.expr),
+)?;
 
-if self.in_mem_batches.len() == 1 {
-let batch = self.in_mem_batches.swap_remove(0);
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
-}
+let batches: Vec<&RecordBatch> = 
self.in_mem_batches.iter().collect();
+let sorted_batch = interleave_record_batch(&batches, 
&interleave_indices)?;
 
-// If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
 self.in_mem_batches.clear();
 self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
+.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
 .map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+
+metrics.record_output(sorted_batch.num_rows());
+
+Ok(Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(async { Ok(sorted_batch) }),
+)) as SendableRecordBatchStream)
+} else {
+// Please pay attention that any operation inside of 
`in_mem_sort_stream` will
+// not perform any memory reservation. This is for avoiding the 
need of handling
+// reservation failure and spilling in the middle of the 
sort/merge. The memory
+// space for batches produced by the resulting stream will be 
reserved by the
+// consumer of the stream.
+
+if self.in_mem_batches.len() == 1 {
+let batch = self.in_mem_batches.swap_remove(0);
+let reservation = self.reservation.take();
+return self.sort_batch_stream(batch, metrics, reservation);
+}
+
+// If less than sort_in_place_threshold_bytes, concatenate and 
sort in place
+if self.reservation.size() < self.sort_in_place_threshold_bytes {
+// Concatenate memory batches together and sort

Review Comment:
   Thank you for the feedback @Dandandan, 
   
   Now the logic is:
   
   1. self.expr.len() <= 2, we merge all the memory batch into one batch using 
interleave , and don't send to merge streaming
   2. Other cases keep original cases.
   
   
   Because, i can't see performance gain for interleave after setting to when 
the sort column >2 from the benchmark result, so i don't change the original 
concatenate here.
   
   
   And more challenges now are the testing fails and memory model. Because we 
return one single batch for the above case1, so it will not have partial merge 
for many cases, only the final merge, so the memory model will fail for some 
cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-17 Thread via GitHub


zhuqi-lucas commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2049170187


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -662,53 +665,152 @@ impl ExternalSorter {
 let elapsed_compute = metrics.elapsed_compute().clone();
 let _timer = elapsed_compute.timer();
 
-// Please pay attention that any operation inside of 
`in_mem_sort_stream` will
-// not perform any memory reservation. This is for avoiding the need 
of handling
-// reservation failure and spilling in the middle of the sort/merge. 
The memory
-// space for batches produced by the resulting stream will be reserved 
by the
-// consumer of the stream.
+// Note, in theory in memory batches should have limited size, but 
some testing
+// cases testing the memory limit use `sort_in_place_threshold_bytes` 
to, so here we
+// set a larger limit to avoid testing failure.
+if self.expr.len() <= 2
+&& self.reservation.size() < 1000 * 
self.sort_in_place_threshold_bytes
+{
+let interleave_indices = self.build_sorted_indices(
+self.in_mem_batches.as_slice(),
+Arc::clone(&self.expr),
+)?;
 
-if self.in_mem_batches.len() == 1 {
-let batch = self.in_mem_batches.swap_remove(0);
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
-}
+let batches: Vec<&RecordBatch> = 
self.in_mem_batches.iter().collect();
+let sorted_batch = interleave_record_batch(&batches, 
&interleave_indices)?;
 
-// If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
 self.in_mem_batches.clear();
 self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
+.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
 .map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+
+metrics.record_output(sorted_batch.num_rows());
+
+Ok(Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(async { Ok(sorted_batch) }),
+)) as SendableRecordBatchStream)
+} else {
+// Please pay attention that any operation inside of 
`in_mem_sort_stream` will
+// not perform any memory reservation. This is for avoiding the 
need of handling
+// reservation failure and spilling in the middle of the 
sort/merge. The memory
+// space for batches produced by the resulting stream will be 
reserved by the
+// consumer of the stream.
+
+if self.in_mem_batches.len() == 1 {
+let batch = self.in_mem_batches.swap_remove(0);
+let reservation = self.reservation.take();
+return self.sort_batch_stream(batch, metrics, reservation);
+}
+
+// If less than sort_in_place_threshold_bytes, concatenate and 
sort in place
+if self.reservation.size() < self.sort_in_place_threshold_bytes {
+// Concatenate memory batches together and sort

Review Comment:
   And, i am not sure why i can't see performance gain for interleave after the 
sort column >2 from the benchmark result?
   
   Also need investigation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-17 Thread via GitHub


zhuqi-lucas commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2049125883


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -662,53 +665,152 @@ impl ExternalSorter {
 let elapsed_compute = metrics.elapsed_compute().clone();
 let _timer = elapsed_compute.timer();
 
-// Please pay attention that any operation inside of 
`in_mem_sort_stream` will
-// not perform any memory reservation. This is for avoiding the need 
of handling
-// reservation failure and spilling in the middle of the sort/merge. 
The memory
-// space for batches produced by the resulting stream will be reserved 
by the
-// consumer of the stream.
+// Note, in theory in memory batches should have limited size, but 
some testing
+// cases testing the memory limit use `sort_in_place_threshold_bytes` 
to, so here we
+// set a larger limit to avoid testing failure.
+if self.expr.len() <= 2
+&& self.reservation.size() < 1000 * 
self.sort_in_place_threshold_bytes
+{
+let interleave_indices = self.build_sorted_indices(
+self.in_mem_batches.as_slice(),
+Arc::clone(&self.expr),
+)?;
 
-if self.in_mem_batches.len() == 1 {
-let batch = self.in_mem_batches.swap_remove(0);
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
-}
+let batches: Vec<&RecordBatch> = 
self.in_mem_batches.iter().collect();
+let sorted_batch = interleave_record_batch(&batches, 
&interleave_indices)?;
 
-// If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
 self.in_mem_batches.clear();
 self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
+.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
 .map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+
+metrics.record_output(sorted_batch.num_rows());
+
+Ok(Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(async { Ok(sorted_batch) }),
+)) as SendableRecordBatchStream)
+} else {
+// Please pay attention that any operation inside of 
`in_mem_sort_stream` will
+// not perform any memory reservation. This is for avoiding the 
need of handling
+// reservation failure and spilling in the middle of the 
sort/merge. The memory
+// space for batches produced by the resulting stream will be 
reserved by the
+// consumer of the stream.
+
+if self.in_mem_batches.len() == 1 {
+let batch = self.in_mem_batches.swap_remove(0);
+let reservation = self.reservation.take();
+return self.sort_batch_stream(batch, metrics, reservation);
+}
+
+// If less than sort_in_place_threshold_bytes, concatenate and 
sort in place
+if self.reservation.size() < self.sort_in_place_threshold_bytes {
+// Concatenate memory batches together and sort

Review Comment:
   Thank you for the feedback @Dandandan, 
   
   Now the logic is:
   
   1. self.expr.len() <= 2, we merge all the memory batch into one batch, and 
don't send to merge streaming
   2. Other cases keep original cases.
   
   
   Because, i can't see performance gain for interleave after setting to when 
the sort column >2 from the benchmark result, so i don't change the original 
concatenate here.
   
   
   And more challenges now are the testing fails and memory model. Because we 
return one single batch for the above case1, so it will not have partial merge 
for many cases, only the final merge, so the memory model will fail for some 
cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-17 Thread via GitHub


Dandandan commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2049110693


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -662,53 +665,152 @@ impl ExternalSorter {
 let elapsed_compute = metrics.elapsed_compute().clone();
 let _timer = elapsed_compute.timer();
 
-// Please pay attention that any operation inside of 
`in_mem_sort_stream` will
-// not perform any memory reservation. This is for avoiding the need 
of handling
-// reservation failure and spilling in the middle of the sort/merge. 
The memory
-// space for batches produced by the resulting stream will be reserved 
by the
-// consumer of the stream.
+// Note, in theory in memory batches should have limited size, but 
some testing
+// cases testing the memory limit use `sort_in_place_threshold_bytes` 
to, so here we
+// set a larger limit to avoid testing failure.
+if self.expr.len() <= 2
+&& self.reservation.size() < 1000 * 
self.sort_in_place_threshold_bytes
+{
+let interleave_indices = self.build_sorted_indices(
+self.in_mem_batches.as_slice(),
+Arc::clone(&self.expr),
+)?;
 
-if self.in_mem_batches.len() == 1 {
-let batch = self.in_mem_batches.swap_remove(0);
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
-}
+let batches: Vec<&RecordBatch> = 
self.in_mem_batches.iter().collect();
+let sorted_batch = interleave_record_batch(&batches, 
&interleave_indices)?;
 
-// If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
 self.in_mem_batches.clear();
 self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
+.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
 .map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+
+metrics.record_output(sorted_batch.num_rows());
+
+Ok(Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(async { Ok(sorted_batch) }),
+)) as SendableRecordBatchStream)
+} else {
+// Please pay attention that any operation inside of 
`in_mem_sort_stream` will
+// not perform any memory reservation. This is for avoiding the 
need of handling
+// reservation failure and spilling in the middle of the 
sort/merge. The memory
+// space for batches produced by the resulting stream will be 
reserved by the
+// consumer of the stream.
+
+if self.in_mem_batches.len() == 1 {
+let batch = self.in_mem_batches.swap_remove(0);
+let reservation = self.reservation.take();
+return self.sort_batch_stream(batch, metrics, reservation);
+}
+
+// If less than sort_in_place_threshold_bytes, concatenate and 
sort in place
+if self.reservation.size() < self.sort_in_place_threshold_bytes {
+// Concatenate memory batches together and sort

Review Comment:
   Hmm... also for this case I think concat only sort arrays instead of batches 
will generally be an optimization (probably as long as there are arrays besides 
the arrays-to-sort).
   
   Maybe we can combine `self.expr.len() <= 2` and `self.reservation.size() < 
self.sort_in_place_threshold_bytes` into one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-17 Thread via GitHub


Dandandan commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2048727270


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -667,35 +668,81 @@ impl ExternalSorter {
 // space for batches produced by the resulting stream will be reserved 
by the
 // consumer of the stream.
 
-if self.in_mem_batches.len() == 1 {
-let batch = self.in_mem_batches.swap_remove(0);
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
-}
+let mut merged_batches = Vec::new();
+let mut current_batches = Vec::new();
+let mut current_size = 0;
 
 // If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
 if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
+let interleave_indices = self.build_sorted_indices(
+self.in_mem_batches.as_slice(),
+Arc::clone(&self.expr),
+)?;
+
+let batches: Vec<&RecordBatch> = 
self.in_mem_batches.iter().collect();
+let sorted_batch = interleave_record_batch(&batches, 
&interleave_indices)?;
+
 self.in_mem_batches.clear();
 self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
+.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
 .map_err(Self::err_with_oom_context)?;
+
+metrics.record_output(sorted_batch.num_columns());
 let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+drop(reservation);
+
+return Ok(Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(async { Ok(sorted_batch) }),
+)) as SendableRecordBatchStream);
+}
+
+let in_mem_batches = std::mem::take(&mut self.in_mem_batches);
+// Note:
+// Now we use `sort_in_place_threshold_bytes` to determine, in future 
we can make it more dynamic.
+for batch in &in_mem_batches {
+let batch_size = get_reserved_byte_for_record_batch(batch);
+
+// If adding this batch would exceed the memory threshold, merge 
current_batches.
+if current_size + batch_size > self.sort_in_place_threshold_bytes
+&& !current_batches.is_empty()
+{
+self.merge_and_push_sorted_batch(
+&mut current_batches,
+&mut current_size,
+&mut merged_batches,
+)?;
+current_size = 0;
+}
+
+current_batches.push(batch.clone());

Review Comment:
   I don't fully get the current code, why does it push a batch here to be used 
in the next call again? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-17 Thread via GitHub


zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2812614902

   Also no regression for small data set:
   
   ```rust
   Benchmark sort_tpch1.json
   
   ┏━━┳━━┳━┳━━━┓
   ┃ Query┃ main ┃ concat_batches_for_sort ┃Change ┃
   ┡━━╇━━╇━╇━━━┩
   │ Q1   │ 153.49ms │144.12ms │ +1.07x faster │
   │ Q2   │ 131.29ms │117.87ms │ +1.11x faster │
   │ Q3   │ 980.57ms │986.92ms │ no change │
   │ Q4   │ 252.25ms │205.13ms │ +1.23x faster │
   │ Q5   │ 464.81ms │464.38ms │ no change │
   │ Q6   │ 481.44ms │474.70ms │ no change │
   │ Q7   │ 810.73ms │705.06ms │ +1.15x faster │
   │ Q8   │ 498.10ms │503.96ms │ no change │
   │ Q9   │ 503.80ms │530.64ms │  1.05x slower │
   │ Q10  │ 789.02ms │705.37ms │ +1.12x faster │
   │ Q11  │ 417.39ms │438.07ms │ no change │
   └──┴──┴─┴───┘
   ┏┳━━━┓
   ┃ Benchmark Summary  ┃   ┃
   ┡╇━━━┩
   │ Total Time (main)  │ 5482.89ms │
   │ Total Time (concat_batches_for_sort)   │ 5276.20ms │
   │ Average Time (main)│  498.44ms │
   │ Average Time (concat_batches_for_sort) │  479.65ms │
   │ Queries Faster │ 5 │
   │ Queries Slower │ 1 │
   │ Queries with No Change │ 5 │
   └┴───┘
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-17 Thread via GitHub


zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2812607514

   ```rust
   
   Benchmark sort_tpch10.json
   
   ┏━━┳┳━┳━━━┓
   ┃ Query┃   main ┃ concat_batches_for_sort ┃Change ┃
   ┡━━╇╇━╇━━━┩
   │ Q1   │  2243.52ms │   1525.77ms │ +1.47x faster │
   │ Q2   │  1842.11ms │   1150.71ms │ +1.60x faster │
   │ Q3   │ 12446.31ms │  12498.09ms │ no change │
   │ Q4   │  4047.55ms │   1949.27ms │ +2.08x faster │
   │ Q5   │  4364.46ms │   4334.64ms │ no change │
   │ Q6   │  4561.01ms │   4538.98ms │ no change │
   │ Q7   │  8158.01ms │   7720.56ms │ +1.06x faster │
   │ Q8   │  6077.40ms │   6010.15ms │ no change │
   │ Q9   │  6347.21ms │   6406.01ms │ no change │
   │ Q10  │ 11561.03ms │   9081.48ms │ +1.27x faster │
   │ Q11  │  6069.42ms │   4684.90ms │ +1.30x faster │
   └──┴┴─┴───┘
   ┏┳┓
   ┃ Benchmark Summary  ┃┃
   ┡╇┩
   │ Total Time (main)  │ 67718.04ms │
   │ Total Time (concat_batches_for_sort)   │ 59900.57ms │
   │ Average Time (main)│  6156.19ms │
   │ Average Time (concat_batches_for_sort) │  5445.51ms │
   │ Queries Faster │  6 │
   │ Queries Slower │  0 │
   │ Queries with No Change │  5 │
   └┴┘
   ```
   
   Updated the latest code result, when column <=2 , we will do it in memory 
sort and using interleave. It's no regression for the testing.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-17 Thread via GitHub


zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2812565082

   > So to change it in your diff (didn't change the documentation).
   > 
   > I would like to keep the original `StreamingMergeBuilder` case and the `if 
self.reservation.size() < self.sort_in_place_threshold_bytes` expression so 
that we only have the "avoid concat for non-sort columns" optimization in place 
and see if this improves on all sort queries.
   > 
   > ```diff
   > -// If less than sort_in_place_threshold_bytes, concatenate and 
sort in place
   > -if self.reservation.size() < self.sort_in_place_threshold_bytes {
   > -// Concatenate memory batches together and sort
   > -let batch = concat_batches(&self.schema, 
&self.in_mem_batches)?;
   > -self.in_mem_batches.clear();
   > -self.reservation
   > -.try_resize(get_reserved_byte_for_record_batch(&batch))
   > -.map_err(Self::err_with_oom_context)?;
   > -let reservation = self.reservation.take();
   > -return self.sort_batch_stream(batch, metrics, reservation);
   > +let mut columns_by_expr: Vec> = vec![vec![]; 
self.expr.len()];
   > +for batch in &self.in_mem_batches {
   > +for (i, expr) in self.expr.iter().enumerate() {
   > +let col = expr.evaluate_to_sort_column(batch)?.values;
   > +columns_by_expr[i].push(col);
   > +}
   >  }
   >  
   > -let streams = std::mem::take(&mut self.in_mem_batches)
   > -.into_iter()
   > -.map(|batch| {
   > -let metrics = self.metrics.baseline.intermediate();
   > -let reservation = self
   > -.reservation
   > -.split(get_reserved_byte_for_record_batch(&batch));
   > -let input = self.sort_batch_stream(batch, metrics, 
reservation)?;
   > -Ok(spawn_buffered(input, 1))
   > -})
   > -.collect::>()?;
   > +// For each sort expression, concatenate arrays from all batches 
into one global array
   > +let mut sort_columns = Vec::with_capacity(self.expr.len());
   > +for (arrays, expr) in 
columns_by_expr.into_iter().zip(self.expr.iter()) {
   > +let array = concat(
   > +&arrays
   > +.iter()
   > +.map(|a| a.as_ref())
   > +.collect::>(),
   > +)?;
   > +sort_columns.push(SortColumn {
   > +values: array,
   > +options: expr.options.into(),
   > +});
   > +}
   >  
   > -let expressions: LexOrdering = 
self.expr.iter().cloned().collect();
   > +// = Phase 2: Compute global sorted indices =
   > +// Use `lexsort_to_indices` to get global row indices in sorted 
order (as if all batches were concatenated)
   > +let indices = lexsort_to_indices(&sort_columns, None)?;
   >  
   > -StreamingMergeBuilder::new()
   > -.with_streams(streams)
   > -.with_schema(Arc::clone(&self.schema))
   > -.with_expressions(expressions.as_ref())
   > -.with_metrics(metrics)
   > -.with_batch_size(self.batch_size)
   > -.with_fetch(None)
   > -.with_reservation(self.merge_reservation.new_empty())
   > -.build()
   > +// = Phase 3: Reorder each column using the global sorted 
indices =
   > +let num_columns = self.schema.fields().len();
   > +
   > +let batch_indices: Vec<(usize, usize)> = self
   > +.in_mem_batches
   > +.iter()
   > +.enumerate()
   > +.map(|(batch_id, batch)| (0..batch.num_rows()).map(move |i| 
(batch_id, i)))
   > +.flatten()
   > +.collect();
   > +
   > +// For each column:
   > +// 1. Concatenate all batch arrays for this column (in the same 
order as assumed by `lexsort_to_indices`)
   > +// 2. Use Arrow's `take` function to reorder the column by sorted 
indices
   > +let interleave_indices: Vec<(usize, usize)> = indices
   > +.values()
   > +.iter()
   > +.map(|x| batch_indices[*x as usize])
   > +.collect();
   > +// Build a RecordBatch from the sorted columns
   > +
   > +let batches: Vec<&RecordBatch> = 
self.in_mem_batches.iter().collect();
   > +
   > +let sorted_batch =
   > +interleave_record_batch(batches.as_ref(), 
&interleave_indices)?;
   > +// Clear in-memory batches and update reservation
   > +self.in_mem_batches.clear();
   > +self.reservation
   > +
.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))?;
   > +let reservation = s

Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-17 Thread via GitHub


Dandandan commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2812523777

   > > > Tested latest PR, it seems regression again for current code...
   > > 
   > > 
   > > Hmm... could it be because of the increased threshold? I think if we're 
strictly faster and keep the same heuristic we shouldn't have regressions?
   > 
   > I did not change the threshold, i found it's not stable for the result, 
but it's will 100% improve for the sort by one column case. But for mulit 
column sorting, it will not improve a lot from current code.
   
   I meant
   > pub sort_in_place_threshold_bytes: usize, default = 4 * 1024 * 1024
   
   seems to be changed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-17 Thread via GitHub


zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2812539191

   > > > > Tested latest PR, it seems regression again for current code...
   > > > 
   > > > 
   > > > Hmm... could it be because of the increased threshold? I think if 
we're strictly faster and keep the same heuristic we shouldn't have regressions?
   > > 
   > > 
   > > I did not change the threshold, i found it's not stable for the result, 
but it's will 100% improve for the sort by one column case. But for mulit 
column sorting, it will not improve a lot from current code.
   > 
   > I meant
   > 
   > > pub sort_in_place_threshold_bytes: usize, default = 4 * 1024 * 1024
   > 
   > seems to be changed?
   
   https://github.com/apache/datafusion/pull/15380#issuecomment-2809956328
   
   Above one, this also tuning to use 4 * 1024 * 1024, so i set it to 4 * 1024 
* 1024, if i change to original value, the performance worse.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-17 Thread via GitHub


zhuqi-lucas commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2048731630


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -667,35 +668,81 @@ impl ExternalSorter {
 // space for batches produced by the resulting stream will be reserved 
by the
 // consumer of the stream.
 
-if self.in_mem_batches.len() == 1 {
-let batch = self.in_mem_batches.swap_remove(0);
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
-}
+let mut merged_batches = Vec::new();
+let mut current_batches = Vec::new();
+let mut current_size = 0;
 
 // If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
 if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
+let interleave_indices = self.build_sorted_indices(
+self.in_mem_batches.as_slice(),
+Arc::clone(&self.expr),
+)?;
+
+let batches: Vec<&RecordBatch> = 
self.in_mem_batches.iter().collect();
+let sorted_batch = interleave_record_batch(&batches, 
&interleave_indices)?;
+
 self.in_mem_batches.clear();
 self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
+.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
 .map_err(Self::err_with_oom_context)?;
+
+metrics.record_output(sorted_batch.num_columns());
 let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+drop(reservation);
+
+return Ok(Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(async { Ok(sorted_batch) }),
+)) as SendableRecordBatchStream);
+}
+
+let in_mem_batches = std::mem::take(&mut self.in_mem_batches);
+// Note:
+// Now we use `sort_in_place_threshold_bytes` to determine, in future 
we can make it more dynamic.
+for batch in &in_mem_batches {
+let batch_size = get_reserved_byte_for_record_batch(batch);
+
+// If adding this batch would exceed the memory threshold, merge 
current_batches.
+if current_size + batch_size > self.sort_in_place_threshold_bytes
+&& !current_batches.is_empty()
+{
+self.merge_and_push_sorted_batch(
+&mut current_batches,
+&mut current_size,
+&mut merged_batches,
+)?;
+current_size = 0;
+}
+
+current_batches.push(batch.clone());

Review Comment:
   We merge for all batches for each sort_in_place_threshold_bytes. So it's a 
loop for merge.
   
   If we only do for < sort_in_place_threshold_bytes, i can't see too much 
improvement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-17 Thread via GitHub


zhuqi-lucas commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2048731630


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -667,35 +668,81 @@ impl ExternalSorter {
 // space for batches produced by the resulting stream will be reserved 
by the
 // consumer of the stream.
 
-if self.in_mem_batches.len() == 1 {
-let batch = self.in_mem_batches.swap_remove(0);
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
-}
+let mut merged_batches = Vec::new();
+let mut current_batches = Vec::new();
+let mut current_size = 0;
 
 // If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
 if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
+let interleave_indices = self.build_sorted_indices(
+self.in_mem_batches.as_slice(),
+Arc::clone(&self.expr),
+)?;
+
+let batches: Vec<&RecordBatch> = 
self.in_mem_batches.iter().collect();
+let sorted_batch = interleave_record_batch(&batches, 
&interleave_indices)?;
+
 self.in_mem_batches.clear();
 self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
+.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
 .map_err(Self::err_with_oom_context)?;
+
+metrics.record_output(sorted_batch.num_columns());
 let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+drop(reservation);
+
+return Ok(Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(async { Ok(sorted_batch) }),
+)) as SendableRecordBatchStream);
+}
+
+let in_mem_batches = std::mem::take(&mut self.in_mem_batches);
+// Note:
+// Now we use `sort_in_place_threshold_bytes` to determine, in future 
we can make it more dynamic.
+for batch in &in_mem_batches {
+let batch_size = get_reserved_byte_for_record_batch(batch);
+
+// If adding this batch would exceed the memory threshold, merge 
current_batches.
+if current_size + batch_size > self.sort_in_place_threshold_bytes
+&& !current_batches.is_empty()
+{
+self.merge_and_push_sorted_batch(
+&mut current_batches,
+&mut current_size,
+&mut merged_batches,
+)?;
+current_size = 0;
+}
+
+current_batches.push(batch.clone());

Review Comment:
   We merge for all batches for each sort_in_place_threshold_bytes. So it's a 
loop for merge.
   
   If we only do for < sort_in_place_threshold_bytes once, i can't see too much 
improvement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-17 Thread via GitHub


zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2812518167

   I think we can do a specific optimization for single column sort, i will 
update the code soon. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-17 Thread via GitHub


zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2812444262

   > > Tested latest PR, it seems regression again for current code...
   > 
   > Hmm... could it be because of the increased threshold? I think if we're 
strictly faster and keep the same heuristic we shouldn't have regressions?
   
   I did not change the threshold, i found it's not stable for the result, but 
it's will 100% improve for the sort by one column case. But for mulit column 
sorting, it will not increase a lot from current code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-17 Thread via GitHub


Dandandan commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2812341624

   > Tested latest PR, it seems regression again for current code...
   
   Hmm... could it be because of the increased threshold? I think if we're 
strictly faster and keep the same heuristic we shouldn't have regressions?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-17 Thread via GitHub


zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2812330239

   Tested latest PR, it seems regression again for current code...
   
   ```rust
   Benchmark sort_tpch10.json
   
   ┏━━┳┳━┳━━━┓
   ┃ Query┃   main ┃ concat_batches_for_sort ┃Change ┃
   ┡━━╇╇━╇━━━┩
   │ Q1   │  2243.52ms │   1655.67ms │ +1.36x faster │
   │ Q2   │  1842.11ms │   1350.11ms │ +1.36x faster │
   │ Q3   │ 12446.31ms │  12363.56ms │ no change │
   │ Q4   │  4047.55ms │   2873.84ms │ +1.41x faster │
   │ Q5   │  4364.46ms │   4644.86ms │  1.06x slower │
   │ Q6   │  4561.01ms │   4769.54ms │ no change │
   │ Q7   │  8158.01ms │  10058.76ms │  1.23x slower │
   │ Q8   │  6077.40ms │   6947.77ms │  1.14x slower │
   │ Q9   │  6347.21ms │   6379.49ms │ no change │
   │ Q10  │ 11561.03ms │  15190.91ms │  1.31x slower │
   │ Q11  │  6069.42ms │   8268.64ms │  1.36x slower │
   └──┴┴─┴───┘
   ┏┳┓
   ┃ Benchmark Summary  ┃┃
   ┡╇┩
   │ Total Time (main)  │ 67718.04ms │
   │ Total Time (concat_batches_for_sort)   │ 74503.14ms │
   │ Average Time (main)│  6156.19ms │
   │ Average Time (concat_batches_for_sort) │  6773.01ms │
   │ Queries Faster │  3 │
   │ Queries Slower │  5 │
   │ Queries with No Change │  3 │
   └┴┘
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-17 Thread via GitHub


zhuqi-lucas commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2048456865


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -667,35 +669,57 @@ impl ExternalSorter {
 // space for batches produced by the resulting stream will be reserved 
by the
 // consumer of the stream.
 
-if self.in_mem_batches.len() == 1 {
-let batch = self.in_mem_batches.swap_remove(0);
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+let mut merged_batches = Vec::new();
+let mut current_batches = Vec::new();
+let mut current_size = 0;
+
+
+let in_mem_batches = std::mem::take(&mut self.in_mem_batches);
+// Note:
+// Now we use `sort_in_place_threshold_bytes` to determine, in future 
we can make it more dynamic.
+for batch in &in_mem_batches {
+let batch_size = get_reserved_byte_for_record_batch(batch);
+
+// If adding this batch would exceed the memory threshold, merge 
current_batches.
+if current_size + batch_size > self.sort_in_place_threshold_bytes
+&& !current_batches.is_empty()
+{
+self.merge_and_push_sorted_batch(
+&mut current_batches,
+&mut current_size,
+&mut merged_batches,
+)?;
+current_size = 0;
+}
+
+current_batches.push(batch.clone());
+current_size += batch_size;
 }
 
-// If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
-self.in_mem_batches.clear();
-self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
-.map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+// Merge any remaining batches after the loop.
+if !current_batches.is_empty() {
+self.merge_and_push_sorted_batch(
+&mut current_batches,
+&mut current_size,
+&mut merged_batches,
+)?;
 }
 
-let streams = std::mem::take(&mut self.in_mem_batches)
+let streams = merged_batches

Review Comment:
   Good catch @Dandandan , for only one batch we don't need to send to 
streaming merge.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-17 Thread via GitHub


Dandandan commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2048432000


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -667,35 +669,57 @@ impl ExternalSorter {
 // space for batches produced by the resulting stream will be reserved 
by the
 // consumer of the stream.
 
-if self.in_mem_batches.len() == 1 {
-let batch = self.in_mem_batches.swap_remove(0);
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+let mut merged_batches = Vec::new();
+let mut current_batches = Vec::new();
+let mut current_size = 0;
+
+
+let in_mem_batches = std::mem::take(&mut self.in_mem_batches);
+// Note:
+// Now we use `sort_in_place_threshold_bytes` to determine, in future 
we can make it more dynamic.
+for batch in &in_mem_batches {
+let batch_size = get_reserved_byte_for_record_batch(batch);
+
+// If adding this batch would exceed the memory threshold, merge 
current_batches.
+if current_size + batch_size > self.sort_in_place_threshold_bytes
+&& !current_batches.is_empty()
+{
+self.merge_and_push_sorted_batch(
+&mut current_batches,
+&mut current_size,
+&mut merged_batches,
+)?;
+current_size = 0;
+}
+
+current_batches.push(batch.clone());
+current_size += batch_size;
 }
 
-// If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
-self.in_mem_batches.clear();
-self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
-.map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+// Merge any remaining batches after the loop.
+if !current_batches.is_empty() {
+self.merge_and_push_sorted_batch(
+&mut current_batches,
+&mut current_size,
+&mut merged_batches,
+)?;
 }
 
-let streams = std::mem::take(&mut self.in_mem_batches)
+let streams = merged_batches

Review Comment:
   It seems now it still "merges" a batch even for a fully sorted single batch 
case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-17 Thread via GitHub


zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2812031488

   Thank you @Dandandan @2010YOUY01 for review.
   
   I am trying to clean up code and do more testing, it seems somethings the 
result is unstable for testing about balance the sort in place size and the 
streaming merge count. The result will diff when running in different env and 
computer if we use a fixed sort-place-size.
   
   And i am thinking, if we can have a thread pod for sort and merge in future, 
and the thread number is partition number, and make sort and merge all async, 
and each sort/merge task can take from the thread pool and make sure it always 
running for the best performance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-16 Thread via GitHub


2010YOUY01 commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2811725054

   I really like this idea, I find this approach will not conflict with several 
future optimizations in my mind:
   - Use row format for sorting and reuse converted `Row`s for SPM, this only 
requires changing the current `vec` to a single `Row`, and won't 
introduce a structure change.
   - Eagerly `Row`s conversion for more robust memory accounting 
https://github.com/apache/datafusion/issues/14748
   
   Additionally, `sort_in_place_threshold_bytes` is relatively easy to tune: it 
can be set as a typical cache size. If the number of sorted runs is still too 
large after combing, a cascaded merge can still be applied on top of that.
   
   I think it's a good idea to proceed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-16 Thread via GitHub


Dandandan commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2047540213


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -673,29 +676,211 @@ impl ExternalSorter {
 return self.sort_batch_stream(batch, metrics, reservation);
 }
 
-// If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
-self.in_mem_batches.clear();
-self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
-.map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+let mut merged_batches = Vec::new();
+let mut current_batches = Vec::new();
+let mut current_size = 0;
+
+// Drain in_mem_batches using pop() to release memory earlier.
+// This avoids holding onto the entire vector during iteration.
+// Note:
+// Now we use `sort_in_place_threshold_bytes` to determine, in future 
we can make it more dynamic.
+while let Some(batch) = self.in_mem_batches.pop() {
+let batch_size = get_reserved_byte_for_record_batch(&batch);
+
+// If adding this batch would exceed the memory threshold, merge 
current_batches.
+if current_size + batch_size > self.sort_in_place_threshold_bytes
+&& !current_batches.is_empty()
+{
+// = Phase 1: Build global sort columns for each sort 
expression =
+// For each sort expression, evaluate and collect the 
corresponding sort column from each in-memory batch
+// Here, `self.expr` is a list of sort expressions, each 
providing `evaluate_to_sort_column()`,
+// which returns an ArrayRef (in `.values`) and sort options 
(`options`)
+
+/// ```text
+/// columns_by_expr for example:
+/// ├── expr_0 ──┬── ArrayRef_0_0 (from batch_0)
+/// │├── ArrayRef_0_1 (from batch_1)
+/// │└── ArrayRef_0_2 (from batch_2)
+/// │
+/// └── expr_1 ──┬── ArrayRef_1_0 (from batch_0)
+///  ├── ArrayRef_1_1 (from batch_1)
+///  └── ArrayRef_1_2 (from batch_2)
+/// ```
+let mut columns_by_expr: Vec> = self
+.expr
+.iter()
+.map(|_| Vec::with_capacity(current_batches.len()))
+.collect();
+
+let mut total_rows = 0;
+for batch in ¤t_batches {
+for (i, expr) in self.expr.iter().enumerate() {
+let col = expr.evaluate_to_sort_column(batch)?.values;
+columns_by_expr[i].push(col);
+}
+total_rows += batch.num_rows();
+}
+
+// For each sort expression, concatenate arrays from all 
batches into one global array
+let mut sort_columns = Vec::with_capacity(self.expr.len());
+for (arrays, expr) in 
columns_by_expr.into_iter().zip(self.expr.iter()) {
+let array = concat(
+&arrays
+.iter()
+.map(|a| a.as_ref())
+.collect::>(),
+)?;
+sort_columns.push(SortColumn {
+values: array,
+options: expr.options.into(),
+});
+}
+
+// = Phase 2: Compute global sorted indices =
+// Use `lexsort_to_indices` to get global row indices in 
sorted order (as if all batches were concatenated)
+
+let indices = lexsort_to_indices(&sort_columns, None)?;
+
+// = Phase 3: Reorder each column using the global sorted 
indices =
+let batch_indices: Vec<(usize, usize)> = current_batches
+.iter()
+.enumerate()
+.map(|(batch_id, batch)| {
+(0..batch.num_rows()).map(move |i| (batch_id, i))
+})
+.flatten()
+.collect();
+
+// For each column:
+// 1. Concatenate all batch arrays for this column (in the 
same order as assumed by `lexsort_to_indices`)
+// 2. Use Arrow's `take` function to reorder the column by 
sorted indices
+let interleave_indices: Vec<(usize, 

Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-16 Thread via GitHub


Dandandan commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2047534508


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -673,29 +676,211 @@ impl ExternalSorter {
 return self.sort_batch_stream(batch, metrics, reservation);
 }
 
-// If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
-self.in_mem_batches.clear();
-self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
-.map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+let mut merged_batches = Vec::new();
+let mut current_batches = Vec::new();
+let mut current_size = 0;
+
+// Drain in_mem_batches using pop() to release memory earlier.
+// This avoids holding onto the entire vector during iteration.
+// Note:
+// Now we use `sort_in_place_threshold_bytes` to determine, in future 
we can make it more dynamic.
+while let Some(batch) = self.in_mem_batches.pop() {

Review Comment:
   Hmm. shouldn't it do it for `self.in_mem_batches` in one go in this case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-16 Thread via GitHub


Dandandan commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2047526844


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -673,29 +676,211 @@ impl ExternalSorter {
 return self.sort_batch_stream(batch, metrics, reservation);
 }
 
-// If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
-self.in_mem_batches.clear();
-self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
-.map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+let mut merged_batches = Vec::new();
+let mut current_batches = Vec::new();
+let mut current_size = 0;
+
+// Drain in_mem_batches using pop() to release memory earlier.
+// This avoids holding onto the entire vector during iteration.
+// Note:
+// Now we use `sort_in_place_threshold_bytes` to determine, in future 
we can make it more dynamic.
+while let Some(batch) = self.in_mem_batches.pop() {
+let batch_size = get_reserved_byte_for_record_batch(&batch);
+
+// If adding this batch would exceed the memory threshold, merge 
current_batches.
+if current_size + batch_size > self.sort_in_place_threshold_bytes
+&& !current_batches.is_empty()
+{
+// = Phase 1: Build global sort columns for each sort 
expression =
+// For each sort expression, evaluate and collect the 
corresponding sort column from each in-memory batch
+// Here, `self.expr` is a list of sort expressions, each 
providing `evaluate_to_sort_column()`,
+// which returns an ArrayRef (in `.values`) and sort options 
(`options`)
+
+/// ```text
+/// columns_by_expr for example:
+/// ├── expr_0 ──┬── ArrayRef_0_0 (from batch_0)
+/// │├── ArrayRef_0_1 (from batch_1)
+/// │└── ArrayRef_0_2 (from batch_2)
+/// │
+/// └── expr_1 ──┬── ArrayRef_1_0 (from batch_0)
+///  ├── ArrayRef_1_1 (from batch_1)
+///  └── ArrayRef_1_2 (from batch_2)
+/// ```
+let mut columns_by_expr: Vec> = self
+.expr
+.iter()
+.map(|_| Vec::with_capacity(current_batches.len()))
+.collect();
+
+let mut total_rows = 0;

Review Comment:
   `total_rows` doesn't seem used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-16 Thread via GitHub


zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2809956328

   Thank you @Dandandan , i submit the first version with both fast and no 
regression at the same time. 
   
   Benchmark sort_tpch1.json
   
   ┏━━┳━━┳━┳━━━┓
   ┃ Query┃ main ┃ concat_batches_for_sort ┃Change ┃
   ┡━━╇━━╇━╇━━━┩
   │ Q1   │ 153.49ms │146.72ms │ no change │
   │ Q2   │ 131.29ms │126.38ms │ no change │
   │ Q3   │ 980.57ms │988.13ms │ no change │
   │ Q4   │ 252.25ms │236.61ms │ +1.07x faster │
   │ Q5   │ 464.81ms │471.17ms │ no change │
   │ Q6   │ 481.44ms │485.58ms │ no change │
   │ Q7   │ 810.73ms │725.17ms │ +1.12x faster │
   │ Q8   │ 498.10ms │522.58ms │ no change │
   │ Q9   │ 503.80ms │527.44ms │ no change │
   │ Q10  │ 789.02ms │737.35ms │ +1.07x faster │
   │ Q11  │ 417.39ms │470.56ms │  1.13x slower │
   └──┴──┴─┴───┘
   ┏┳━━━┓
   ┃ Benchmark Summary  ┃   ┃
   ┡╇━━━┩
   │ Total Time (main)  │ 5482.89ms │
   │ Total Time (concat_batches_for_sort)   │ 5437.69ms │
   │ Average Time (main)│  498.44ms │
   │ Average Time (concat_batches_for_sort) │  494.34ms │
   │ Queries Faster │ 3 │
   │ Queries Slower │ 1 │
   │ Queries with No Change │ 7 │
   └┴───┘
   
   Benchmark sort_tpch10.json
   
   ┏━━┳┳━┳━━━┓
   ┃ Query┃   main ┃ concat_batches_for_sort ┃Change ┃
   ┡━━╇╇━╇━━━┩
   │ Q1   │  2243.52ms │   1551.61ms │ +1.45x faster │
   │ Q2   │  1842.11ms │   1339.78ms │ +1.37x faster │
   │ Q3   │ 12446.31ms │  12173.95ms │ no change │
   │ Q4   │  4047.55ms │   3159.04ms │ +1.28x faster │
   │ Q5   │  4364.46ms │   4478.49ms │ no change │
   │ Q6   │  4561.01ms │   4622.45ms │ no change │
   │ Q7   │  8158.01ms │   7864.92ms │ no change │
   │ Q8   │  6077.40ms │   5681.92ms │ +1.07x faster │
   │ Q9   │  6347.21ms │   5817.61ms │ +1.09x faster │
   │ Q10  │ 11561.03ms │   .58ms │ +1.30x faster │
   │ Q11  │  6069.42ms │   .38ms │ +1.09x faster │
   └──┴┴─┴───┘
   ┏┳┓
   ┃ Benchmark Summary  ┃┃
   ┡╇┩
   │ Total Time (main)  │ 67718.04ms │
   │ Total Time (concat_batches_for_sort)   │ 61133.74ms │
   │ Average Time (main)│  6156.19ms │
   │ Average Time (concat_batches_for_sort) │  5557.61ms │
   │ Queries Faster │  7 │
   │ Queries Slower │  0 │
   │ Queries with No Change │  4 │
   └┴┘


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-16 Thread via GitHub


Dandandan commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2809508850

   So to change it in your diff (didn't change the documentation).
   
   I would like to keep the original `StreamingMergeBuilder` case and the `if 
self.reservation.size() < self.sort_in_place_threshold_bytes` expression so 
that we only have the "avoid concat for non-sort columns" optimization in place 
and see if this improves on all sort queries.
   
   ```diff
   
   -// If less than sort_in_place_threshold_bytes, concatenate and sort 
in place
   -if self.reservation.size() < self.sort_in_place_threshold_bytes {
   -// Concatenate memory batches together and sort
   -let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
   -self.in_mem_batches.clear();
   -self.reservation
   -.try_resize(get_reserved_byte_for_record_batch(&batch))
   -.map_err(Self::err_with_oom_context)?;
   -let reservation = self.reservation.take();
   -return self.sort_batch_stream(batch, metrics, reservation);
   +let mut columns_by_expr: Vec> = vec![vec![]; 
self.expr.len()];
   +for batch in &self.in_mem_batches {
   +for (i, expr) in self.expr.iter().enumerate() {
   +let col = expr.evaluate_to_sort_column(batch)?.values;
   +columns_by_expr[i].push(col);
   +}
}

   -let streams = std::mem::take(&mut self.in_mem_batches)
   -.into_iter()
   -.map(|batch| {
   -let metrics = self.metrics.baseline.intermediate();
   -let reservation = self
   -.reservation
   -.split(get_reserved_byte_for_record_batch(&batch));
   -let input = self.sort_batch_stream(batch, metrics, 
reservation)?;
   -Ok(spawn_buffered(input, 1))
   -})
   -.collect::>()?;
   +// For each sort expression, concatenate arrays from all batches 
into one global array
   +let mut sort_columns = Vec::with_capacity(self.expr.len());
   +for (arrays, expr) in 
columns_by_expr.into_iter().zip(self.expr.iter()) {
   +let array = concat(
   +&arrays
   +.iter()
   +.map(|a| a.as_ref())
   +.collect::>(),
   +)?;
   +sort_columns.push(SortColumn {
   +values: array,
   +options: expr.options.into(),
   +});
   +}

   -let expressions: LexOrdering = self.expr.iter().cloned().collect();
   +// = Phase 2: Compute global sorted indices =
   +// Use `lexsort_to_indices` to get global row indices in sorted 
order (as if all batches were concatenated)
   +let indices = lexsort_to_indices(&sort_columns, None)?;

   -StreamingMergeBuilder::new()
   -.with_streams(streams)
   -.with_schema(Arc::clone(&self.schema))
   -.with_expressions(expressions.as_ref())
   -.with_metrics(metrics)
   -.with_batch_size(self.batch_size)
   -.with_fetch(None)
   -.with_reservation(self.merge_reservation.new_empty())
   -.build()
   +// = Phase 3: Reorder each column using the global sorted 
indices =
   +let num_columns = self.schema.fields().len();
   +
   +let batch_indices: Vec<(usize, usize)> = self
   +.in_mem_batches
   +.iter()
   +.enumerate()
   +.map(|(batch_id, batch)| (0..batch.num_rows()).map(move |i| 
(batch_id, i)))
   +.flatten()
   +.collect();
   +
   +// For each column:
   +// 1. Concatenate all batch arrays for this column (in the same 
order as assumed by `lexsort_to_indices`)
   +// 2. Use Arrow's `take` function to reorder the column by sorted 
indices
   +let interleave_indices: Vec<(usize, usize)> = indices
   +.values()
   +.iter()
   +.map(|x| batch_indices[*x as usize])
   +.collect();
   +// Build a RecordBatch from the sorted columns
   +
   +let batches: Vec<&RecordBatch> = 
self.in_mem_batches.iter().collect();
   +
   +let sorted_batch =
   +interleave_record_batch(batches.as_ref(), &interleave_indices)?;
   +// Clear in-memory batches and update reservation
   +self.in_mem_batches.clear();
   +self.reservation
   +.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))?;
   +let reservation = self.reservation.take();
   +
   +// = Phase 4: Construct the resulting stream =
   +let stream = futures::stream::once(async move {
   +let _timer = met

Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-16 Thread via GitHub


Dandandan commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2809275560

   Interesting! I think it's close but still concatenate the input arrays in 
Phase 3.
   I think it's pretty close, what you can do to avoid it is:
   * create a `Vec<(size, size)>` based on the input batches, each element 
being (batch_id, row_id)` (0, 1), (0,2), 0,3
   * use the global indices to find the (batch_id, row_id) for each sorted 
index into a new `Vec`
   * use `interleave` on all arrays rather than `take`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-16 Thread via GitHub


zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2809034811

   
   
   ```rust
   /bench.sh  compare main concat_batches_for_sort
   Comparing main and concat_batches_for_sort
   Note: Skipping 
/Users/zhuqi/arrow-datafusion/benchmarks/results/main/clickbench_1.json as 
/Users/zhuqi/arrow-datafusion/benchmarks/results/concat_batches_for_sort/clickbench_1.json
 does not exist
   Note: Skipping 
/Users/zhuqi/arrow-datafusion/benchmarks/results/main/h2o_join.json as 
/Users/zhuqi/arrow-datafusion/benchmarks/results/concat_batches_for_sort/h2o_join.json
 does not exist
   Note: Skipping 
/Users/zhuqi/arrow-datafusion/benchmarks/results/main/sort_tpch.json as 
/Users/zhuqi/arrow-datafusion/benchmarks/results/concat_batches_for_sort/sort_tpch.json
 does not exist
   
   Benchmark sort_tpch1.json
   
   ┏━━┳━━┳━┳━━━┓
   ┃ Query┃ main ┃ concat_batches_for_sort ┃Change ┃
   ┡━━╇━━╇━╇━━━┩
   │ Q1   │ 153.49ms │142.66ms │ +1.08x faster │
   │ Q2   │ 131.29ms │110.46ms │ +1.19x faster │
   │ Q3   │ 980.57ms │959.09ms │ no change │
   │ Q4   │ 252.25ms │207.46ms │ +1.22x faster │
   │ Q5   │ 464.81ms │561.74ms │  1.21x slower │
   │ Q6   │ 481.44ms │582.54ms │  1.21x slower │
   │ Q7   │ 810.73ms │910.22ms │  1.12x slower │
   │ Q8   │ 498.10ms │473.39ms │ no change │
   │ Q9   │ 503.80ms │492.94ms │ no change │
   │ Q10  │ 789.02ms │833.81ms │  1.06x slower │
   │ Q11  │ 417.39ms │401.60ms │ no change │
   └──┴──┴─┴───┘
   ┏┳━━━┓
   ┃ Benchmark Summary  ┃   ┃
   ┡╇━━━┩
   │ Total Time (main)  │ 5482.89ms │
   │ Total Time (concat_batches_for_sort)   │ 5675.92ms │
   │ Average Time (main)│  498.44ms │
   │ Average Time (concat_batches_for_sort) │  515.99ms │
   │ Queries Faster │ 3 │
   │ Queries Slower │ 4 │
   │ Queries with No Change │ 4 │
   └┴───┘
   
   Benchmark sort_tpch10.json
   
   ┏━━┳┳━┳━━━┓
   ┃ Query┃   main ┃ concat_batches_for_sort ┃Change ┃
   ┡━━╇╇━╇━━━┩
   │ Q1   │  2243.52ms │   1412.73ms │ +1.59x faster │
   │ Q2   │  1842.11ms │   1113.81ms │ +1.65x faster │
   │ Q3   │ 12446.31ms │  12687.14ms │ no change │
   │ Q4   │  4047.55ms │   1891.10ms │ +2.14x faster │
   │ Q5   │  4364.46ms │   6024.94ms │  1.38x slower │
   │ Q6   │  4561.01ms │   6281.17ms │  1.38x slower │
   │ Q7   │  8158.01ms │  13184.06ms │  1.62x slower │
   │ Q8   │  6077.40ms │   5277.44ms │ +1.15x faster │
   │ Q9   │  6347.21ms │   5308.08ms │ +1.20x faster │
   │ Q10  │ 11561.03ms │  22213.97ms │  1.92x slower │
   │ Q11  │  6069.42ms │   4524.58ms │ +1.34x faster │
   └──┴┴─┴───┘
   ┏┳┓
   ┃ Benchmark Summary  ┃┃
   ┡╇┩
   │ Total Time (main)  │ 67718.04ms │
   │ Total Time (concat_batches_for_sort)   │ 79918.99ms │
   │ Average Time (main)│  6156.19ms │
   │ Average Time (concat_batches_for_sort) │  7265.36ms │
   │ Queries Faster │  6 │
   │ Queries Slower │  4 │
   │ Queries with No Change │  1 │
   └┴┘
   ```
   
   > I wonder if we can make a more "simple" change for now:
   > 
   > * `concat` regresses because it copies the _all columns_ of the 
recordbatch before sorting.
   > * We can concat the sorting columns instead + generate a Vec of 
`(batch_id, row_id)` and map output of `lexsort_to_indices` back to the 
original values.
   > 
   > Doing this I think will benefit existing sorting without (too much) 
regressions. we probably try increase the in memory threshold value 
(`sort_in_place_threshold_by

Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-16 Thread via GitHub


Dandandan commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2808612984

   I wonder if we can make a more "simple" change for now:
   * `concat` regresses because it copies the _all columns_ of the recordbatch 
before sorting.
   * We can concat the sorting columns instead + generate a Vec of `(batch_id, 
row_id)` and map output of `lexsort_to_indices` back to the original values.
   
   Doing this I think will benefit existing sorting without (too much) 
regressions. we probably try increase the in memory threshold value 
(`sort_in_place_threshold_bytes`).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-15 Thread via GitHub


zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2808117814

   > 🤖: Benchmark completed
   > 
   > Details
   > 
   > ```
   > Comparing HEAD and concat_batches_for_sort
   > 
   > Benchmark clickbench_1.json
   > 
   > ┏━━┳┳━┳━━━┓
   > ┃ Query┃   HEAD ┃ concat_batches_for_sort ┃Change ┃
   > ┡━━╇╇━╇━━━┩
   > │ QQuery 0 │ 0.56ms │  0.56ms │ no change │
   > │ QQuery 1 │80.70ms │ 78.53ms │ no change │
   > │ QQuery 2 │   116.89ms │114.69ms │ no change │
   > │ QQuery 3 │   130.38ms │125.49ms │ no change │
   > │ QQuery 4 │   861.43ms │756.05ms │ +1.14x faster │
   > │ QQuery 5 │   878.02ms │869.13ms │ no change │
   > │ QQuery 6 │ 0.67ms │  0.64ms │ no change │
   > │ QQuery 7 │   100.33ms │ 93.51ms │ +1.07x faster │
   > │ QQuery 8 │   979.95ms │956.50ms │ no change │
   > │ QQuery 9 │  1283.74ms │   1245.12ms │ no change │
   > │ QQuery 10│   304.60ms │306.39ms │ no change │
   > │ QQuery 11│   342.54ms │340.89ms │ no change │
   > │ QQuery 12│   930.04ms │933.37ms │ no change │
   > │ QQuery 13│  1337.30ms │   1341.28ms │ no change │
   > │ QQuery 14│   869.61ms │883.04ms │ no change │
   > │ QQuery 15│  1088.81ms │   1083.02ms │ no change │
   > │ QQuery 16│  1841.74ms │   1788.14ms │ no change │
   > │ QQuery 17│  1680.12ms │   1638.39ms │ no change │
   > │ QQuery 18│  3128.65ms │   3139.26ms │ no change │
   > │ QQuery 19│   127.46ms │120.42ms │ +1.06x faster │
   > │ QQuery 20│  1169.35ms │   1195.58ms │ no change │
   > │ QQuery 21│  1472.42ms │   1457.18ms │ no change │
   > │ QQuery 22│  2595.51ms │   2696.08ms │ no change │
   > │ QQuery 23│  8475.08ms │   8735.96ms │ no change │
   > │ QQuery 24│   510.59ms │515.80ms │ no change │
   > │ QQuery 25│   441.39ms │439.44ms │ no change │
   > │ QQuery 26│   569.36ms │581.32ms │ no change │
   > │ QQuery 27│  1850.31ms │   1844.63ms │ no change │
   > │ QQuery 28│ 13503.59ms │  13185.12ms │ no change │
   > │ QQuery 29│   587.04ms │548.23ms │ +1.07x faster │
   > │ QQuery 30│   872.06ms │861.85ms │ no change │
   > │ QQuery 31│   924.05ms │992.86ms │  1.07x slower │
   > │ QQuery 32│  2763.71ms │   2715.48ms │ no change │
   > │ QQuery 33│  3455.95ms │   3450.90ms │ no change │
   > │ QQuery 34│  3466.53ms │   3478.02ms │ no change │
   > │ QQuery 35│  1342.93ms │   1336.22ms │ no change │
   > │ QQuery 36│   179.89ms │185.83ms │ no change │
   > │ QQuery 37│   106.98ms │102.42ms │ no change │
   > │ QQuery 38│   169.57ms │181.57ms │  1.07x slower │
   > │ QQuery 39│   263.27ms │261.33ms │ no change │
   > │ QQuery 40│90.39ms │ 86.94ms │ no change │
   > │ QQuery 41│85.04ms │ 83.29ms │ no change │
   > │ QQuery 42│78.33ms │ 78.86ms │ no change │
   > └──┴┴─┴───┘
   > ┏┳┓
   > ┃ Benchmark Summary  ┃┃
   > ┡╇┩
   > │ Total Time (HEAD)  │ 61056.89ms │
   > │ Total Time (concat_batches_for_sort)   │ 60829.33ms │
   > │ Average Time (HEAD)│  1419.93ms │
   > │ Average Time (concat_batches_for_sort) │  1414.64ms │
   > │ Queries Faster │  4 │
   > │ Queries Slower │  2 │
   > │ Queries with No Change │ 37 │
   > └┴┘
   > 
   > Benchmark clickbench_partitioned.json
   > 
   > ┏━━┳┳━┳━━┓
   > ┃ Query┃   HEAD ┃ concat_batches_for_sort ┃   Change ┃
   > ┡━━╇╇━╇━━┩
   > │ QQuery 0 │ 2.39ms │  2.57ms │ 1.07x slower │

Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-15 Thread via GitHub


zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2808114536

   > > I think the most efficient way would be to sort the indices to the 
arrays in one step followed by interleave, without either concat or sort 
followed by merge which would benefit the most from the built in sort algorithm 
and avoids copying the data.
   > 
   > I wonder if we can skip interleave / copying entirely?
   > 
   > Specifically, what if we sorted to indices, as you suggested, but then 
instead of calling `interleave` (which will copy the data) before sending it to 
merge_streams) maybe we could have some way to have the merge cursors also take 
the indicies -- so we could only copy data once 🤔
   
   Thanks @alamb , it looks promising.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-15 Thread via GitHub


alamb commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2806923604

   🤖: Benchmark completed
   
   Details
   
   
   
   ```
   Comparing HEAD and concat_batches_for_sort
   
   Benchmark clickbench_1.json
   
   ┏━━┳┳━┳━━━┓
   ┃ Query┃   HEAD ┃ concat_batches_for_sort ┃Change ┃
   ┡━━╇╇━╇━━━┩
   │ QQuery 0 │ 0.56ms │  0.56ms │ no change │
   │ QQuery 1 │80.70ms │ 78.53ms │ no change │
   │ QQuery 2 │   116.89ms │114.69ms │ no change │
   │ QQuery 3 │   130.38ms │125.49ms │ no change │
   │ QQuery 4 │   861.43ms │756.05ms │ +1.14x faster │
   │ QQuery 5 │   878.02ms │869.13ms │ no change │
   │ QQuery 6 │ 0.67ms │  0.64ms │ no change │
   │ QQuery 7 │   100.33ms │ 93.51ms │ +1.07x faster │
   │ QQuery 8 │   979.95ms │956.50ms │ no change │
   │ QQuery 9 │  1283.74ms │   1245.12ms │ no change │
   │ QQuery 10│   304.60ms │306.39ms │ no change │
   │ QQuery 11│   342.54ms │340.89ms │ no change │
   │ QQuery 12│   930.04ms │933.37ms │ no change │
   │ QQuery 13│  1337.30ms │   1341.28ms │ no change │
   │ QQuery 14│   869.61ms │883.04ms │ no change │
   │ QQuery 15│  1088.81ms │   1083.02ms │ no change │
   │ QQuery 16│  1841.74ms │   1788.14ms │ no change │
   │ QQuery 17│  1680.12ms │   1638.39ms │ no change │
   │ QQuery 18│  3128.65ms │   3139.26ms │ no change │
   │ QQuery 19│   127.46ms │120.42ms │ +1.06x faster │
   │ QQuery 20│  1169.35ms │   1195.58ms │ no change │
   │ QQuery 21│  1472.42ms │   1457.18ms │ no change │
   │ QQuery 22│  2595.51ms │   2696.08ms │ no change │
   │ QQuery 23│  8475.08ms │   8735.96ms │ no change │
   │ QQuery 24│   510.59ms │515.80ms │ no change │
   │ QQuery 25│   441.39ms │439.44ms │ no change │
   │ QQuery 26│   569.36ms │581.32ms │ no change │
   │ QQuery 27│  1850.31ms │   1844.63ms │ no change │
   │ QQuery 28│ 13503.59ms │  13185.12ms │ no change │
   │ QQuery 29│   587.04ms │548.23ms │ +1.07x faster │
   │ QQuery 30│   872.06ms │861.85ms │ no change │
   │ QQuery 31│   924.05ms │992.86ms │  1.07x slower │
   │ QQuery 32│  2763.71ms │   2715.48ms │ no change │
   │ QQuery 33│  3455.95ms │   3450.90ms │ no change │
   │ QQuery 34│  3466.53ms │   3478.02ms │ no change │
   │ QQuery 35│  1342.93ms │   1336.22ms │ no change │
   │ QQuery 36│   179.89ms │185.83ms │ no change │
   │ QQuery 37│   106.98ms │102.42ms │ no change │
   │ QQuery 38│   169.57ms │181.57ms │  1.07x slower │
   │ QQuery 39│   263.27ms │261.33ms │ no change │
   │ QQuery 40│90.39ms │ 86.94ms │ no change │
   │ QQuery 41│85.04ms │ 83.29ms │ no change │
   │ QQuery 42│78.33ms │ 78.86ms │ no change │
   └──┴┴─┴───┘
   ┏┳┓
   ┃ Benchmark Summary  ┃┃
   ┡╇┩
   │ Total Time (HEAD)  │ 61056.89ms │
   │ Total Time (concat_batches_for_sort)   │ 60829.33ms │
   │ Average Time (HEAD)│  1419.93ms │
   │ Average Time (concat_batches_for_sort) │  1414.64ms │
   │ Queries Faster │  4 │
   │ Queries Slower │  2 │
   │ Queries with No Change │ 37 │
   └┴┘
   
   Benchmark clickbench_partitioned.json
   
   ┏━━┳┳━┳━━┓
   ┃ Query┃   HEAD ┃ concat_batches_for_sort ┃   Change ┃
   ┡━━╇╇━╇━━┩
   │ QQuery 0 │ 2.39ms │  2.57ms │ 1.07x slower │
   │ QQuery 1 │36.11ms │ 36.43ms │no change │
   │ QQuery 2 │91.64ms │ 90.30ms │no change

Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-15 Thread via GitHub


alamb commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2806867253

   > I think the most efficient way would be to sort the indices to the arrays 
in one step followed by interleave, without either concat or sort followed by 
merge which would benefit the most from the built in sort algorithm and avoids 
copying the data.
   
   I wonder if we can skip interleave / copying entirely?
   
   Specifically, what if we sorted to indices, as you suggested, but then 
instead of calling `interleave` (which will copy the data) before sending it to 
merge_streams) maybe we could have some way to have the merge cursors also take 
the indicies -- so we could only copy data once 🤔 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-15 Thread via GitHub


alamb commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2806860357

   > > 🤖 `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking) Running Linux aal-dev 
6.8.0-1016-gcp #18-Ubuntu SMP Fri Oct 4 22:16:29 UTC 2024 x86_64 x86_64 x86_64 
GNU/Linux Comparing concat_batches_for_sort 
([6063bc5](https://github.com/apache/datafusion/commit/6063bc572ca61fd71fd57382f94389ec99d9649f))
 to 
[0b01fdf](https://github.com/apache/datafusion/commit/0b01fdf7f02f9097c319204058576f420b9790b4)
 
[diff](https://github.com/apache/datafusion/compare/0b01fdf7f02f9097c319204058576f420b9790b4..6063bc572ca61fd71fd57382f94389ec99d9649f)
 Benchmarks: clickbench_1 clickbench_partitioned sort_tpch1 Results will be 
posted here when complete
   > 
   > Thanks @alamb for this triggering, it seems stuck.
   
   yeah, sorry I had a bug retriggered


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-15 Thread via GitHub


alamb commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2806856758

   🤖 `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking) Running
   Linux aal-dev 6.8.0-1016-gcp #18-Ubuntu SMP Fri Oct  4 22:16:29 UTC 2024 
x86_64 x86_64 x86_64 GNU/Linux
   Comparing concat_batches_for_sort (6063bc572ca61fd71fd57382f94389ec99d9649f) 
to 0b01fdf7f02f9097c319204058576f420b9790b4 
[diff](https://github.com/apache/datafusion/compare/0b01fdf7f02f9097c319204058576f420b9790b4..6063bc572ca61fd71fd57382f94389ec99d9649f)
   Benchmarks: clickbench_1 clickbench_partitioned sort_tpch
   Results will be posted here when complete
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-15 Thread via GitHub


zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2803885281

   > 🤖 `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking) Running Linux aal-dev 
6.8.0-1016-gcp #18-Ubuntu SMP Fri Oct 4 22:16:29 UTC 2024 x86_64 x86_64 x86_64 
GNU/Linux Comparing concat_batches_for_sort 
([6063bc5](https://github.com/apache/datafusion/commit/6063bc572ca61fd71fd57382f94389ec99d9649f))
 to 
[0b01fdf](https://github.com/apache/datafusion/commit/0b01fdf7f02f9097c319204058576f420b9790b4)
 
[diff](https://github.com/apache/datafusion/compare/0b01fdf7f02f9097c319204058576f420b9790b4..6063bc572ca61fd71fd57382f94389ec99d9649f)
 Benchmarks: clickbench_1 clickbench_partitioned sort_tpch1 Results will be 
posted here when complete
   
   Thanks @alamb for this triggering, it seems stuck.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-15 Thread via GitHub


zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2804582604

   Thank you @2010YOUY01 @Dandandan  , it's very interesting, i am thinking:
   
   1. Since the all batch size sum is fixed, we can first calculate the compute 
size of each partition, call it partition_cal_size.
   2. Then we setting a min_sort_size and max_sort_size, so we will determine  
the final_merged_batch_size:
   ```rust
   final_merged_batch_size = 
 if (partition_cal_size < min_sort_size) => min_sort_size
 else if (partition_cal_size > max_sort_size) => max_sort_size
 else => partition_cal_size
   ```
   This prevents creating too many small batches (which can fragment merge 
tasks) or overly large batches.
   It looks like the first version of heuristic
   
   > > > I think for `ExternalSorter` we don't want any additional parallelism 
as the sort is already executed per partition (so additional parallelism is 
likely to hurt rather than help).
   > > 
   > > 
   > > In this case, the final merging might become the bottleneck, because SPM 
does not have internal parallelism either, during the final merge only 1 core 
is busy. I think 2 stages of sort-preserving merge is still needed, becuase 
`ExternalSorter` is blocking, but `SPM` is not, this setup can keep all the 
cores busy after partial sort is finished. We just have to ensure they don't 
have a very large merge degree to become slow (with the optimizations like this 
PR)
   > 
   > Yes, to be clear I don't argue to remove SortPreservingMergeExec or 
sorting in two fases altogether or something similar, just was reacting to the 
idea of adding more parallelism in `in_mem_sort_stream` which probably won't 
help much.
   > 
   > ```
   > SortPreserveMergeExec <= Does k-way merging based on input streams, with 
minimal memory overhead, maximizing input parallelism
   >  SortExec partitions[1,2,3,4,5,6,7,8,9,10] <= Performs in memory 
*sorting* if possible, for each input partition in parallel, only resorting to 
spill/merge when does not fit into memory 
   > ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-15 Thread via GitHub


zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2804362823

   > I think `concat` followed by `sort` is slower in some cases because
   > 
   > * Concat involves copying the entire batch (rather than only the keys to 
be sorted)
   > * `sort_batch_stream` Can be slower as `lexsort_to_indices` is in cases 
with many columns slower than the Row Format
   > 
   > I think for `ExternalSorter` we don't want any additional parallelism as 
the sort is already executed per partition (so additional parallelism is likely 
to hurt rather than help).
   > 
   > The core improvements that I think are important:
   > 
   > * Minimizing copying of the input batches to one (only once for the output)
   > * Sorting once on the input batches rather than sort followed by merge
   > * A good heuristic on when to switch from `lexsort_to_indices`-like 
sorting to RowConverter + sorting.
   
   Good explain.
   > I think for ExternalSorter we don't want any additional parallelism as the 
sort is already executed per partition (so additional parallelism is likely to 
hurt rather than help).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-15 Thread via GitHub


Dandandan commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2804526858

   > > I think for `ExternalSorter` we don't want any additional parallelism as 
the sort is already executed per partition (so additional parallelism is likely 
to hurt rather than help).
   > 
   > In this case, the final merging might become the bottleneck, because SPM 
does not have internal parallelism either, during the final merge only 1 core 
is busy. I think 2 stages of sort-preserving merge is still needed, becuase 
`ExternalSorter` is blocking, but `SPM` is not, this setup can keep all the 
cores busy after partial sort is finished. We just have to ensure they don't 
have a very large merge degree to become slow (with the optimizations like this 
PR)
   
   Yes, to be clear I don't argue to remove SortPreservingMergeExec or sorting 
in two fases altogether or something similar, just was reacting to the idea of 
adding more parallelism in `in_mem_sort_stream` which probably won't help much.
   
   ```
   SortPreserveMergeExec <= Does k-way merging based on input streams, with 
minimal memory overhead, maximizing input parallelism
SortExec partitions[1,2,3,4,5,6,7,8,9,10] <= Performs in memory 
*sorting* if possible, for each input partition in parallel, only resorting to 
spill/merge when does not fit into memory 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-15 Thread via GitHub


2010YOUY01 commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2804473128

   > I think for `ExternalSorter` we don't want any additional parallelism as 
the sort is already executed per partition (so additional parallelism is likely 
to hurt rather than help).
   
   In this case, the final merging might become the bottleneck, because SPM 
does not have internal parallelism either, during the final merge only 1 core 
is busy.
   I think 2 stages of sort-preserving merge is still needed, becuase 
`ExternalSorter` is blocking, but `SPM` is not, this setup can keep all the 
cores busy after partial sort is finished.
   We just have to ensure they don't have a very large merge degree to become 
slow (with the optimizations like this PR)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-15 Thread via GitHub


Dandandan commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2804304348

   I think `concat` followed by `sort` is slower because
   
   * Concat involves copying the entire batch (rather than only the keys to be 
sorted)
   * `sort_batch_stream` Can be slower as `lexsort_to_indices` is in cases with 
many columns slower than the Row Format
   
   I think for `ExternalSorter` we don't want any additional parallelism as the 
sort is already executed per partition.
   
   The core improvement:
   
   * minimizing copying of the input batches to one (only for the output)
   * sorting on the input batches rather than sort followed by merge
   * A good heuristic on when to switch from `lexsort_to_indices`-like sorting 
to RowConverter + sorting.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-15 Thread via GitHub


zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2804216535

   Very interesting, firstly i now try merge all memory batch, and single sort, 
some query become crazy fast and some crazy slow, i think because:
   
   1. We sort in memory without merge, it's similar to sort single partition 
without partition parallel ?
   2. Previous some merge will have partition parallel?
   
   So next step, we can try to make the in memory sort with parallel?
   
   ```rust
   
   Benchmark sort_tpch10.json
   
   ┏━━┳┳━┳━━━┓
   ┃ Query┃   main ┃ concat_batches_for_sort ┃Change ┃
   ┡━━╇╇━╇━━━┩
   │ Q1   │  2243.52ms │   1416.52ms │ +1.58x faster │
   │ Q2   │  1842.11ms │   1096.12ms │ +1.68x faster │
   │ Q3   │ 12446.31ms │  12535.45ms │ no change │
   │ Q4   │  4047.55ms │   1964.73ms │ +2.06x faster │
   │ Q5   │  4364.46ms │   5955.70ms │  1.36x slower │
   │ Q6   │  4561.01ms │   6275.39ms │  1.38x slower │
   │ Q7   │  8158.01ms │  19145.68ms │  2.35x slower │
   │ Q8   │  6077.40ms │   5146.80ms │ +1.18x faster │
   │ Q9   │  6347.21ms │   5544.48ms │ +1.14x faster │
   │ Q10  │ 11561.03ms │  23572.68ms │  2.04x slower │
   │ Q11  │  6069.42ms │   4810.88ms │ +1.26x faster │
   └──┴┴─┴───┘
   ┏┳┓
   ┃ Benchmark Summary  ┃┃
   ┡╇┩
   │ Total Time (main)  │ 67718.04ms │
   │ Total Time (concat_batches_for_sort)   │ 87464.44ms │
   │ Average Time (main)│  6156.19ms │
   │ Average Time (concat_batches_for_sort) │  7951.31ms │
   │ Queries Faster │  6 │
   │ Queries Slower │  4 │
   │ Queries with No Change │  1 │
   └┴┘
   ```
   
   
   Patch tried:
   ```rust
   diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
   index 7fd1c2b16..ec3cd89f3 100644
   --- a/datafusion/physical-plan/src/sorts/sort.rs
   +++ b/datafusion/physical-plan/src/sorts/sort.rs
   @@ -671,85 +671,14 @@ impl ExternalSorter {
return self.sort_batch_stream(batch, metrics, reservation);
}
   
   -// If less than sort_in_place_threshold_bytes, concatenate and sort 
in place
   -if self.reservation.size() < self.sort_in_place_threshold_bytes {
   -// Concatenate memory batches together and sort
   -let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
   -self.in_mem_batches.clear();
   -self.reservation
   -.try_resize(get_reserved_byte_for_record_batch(&batch))?;
   -let reservation = self.reservation.take();
   -return self.sort_batch_stream(batch, metrics, reservation);
   -}
   -
   -let mut merged_batches = Vec::new();
   -let mut current_batches = Vec::new();
   -let mut current_size = 0;
   -
   -// Drain in_mem_batches using pop() to release memory earlier.
   -// This avoids holding onto the entire vector during iteration.
   -// Note:
   -// Now we use `sort_in_place_threshold_bytes` to determine, in 
future we can make it more dynamic.
   -while let Some(batch) = self.in_mem_batches.pop() {
   -let batch_size = get_reserved_byte_for_record_batch(&batch);
   -
   -// If adding this batch would exceed the memory threshold, 
merge current_batches.
   -if current_size + batch_size > 
self.sort_in_place_threshold_bytes
   -&& !current_batches.is_empty()
   -{
   -// Merge accumulated batches into one.
   -let merged = concat_batches(&self.schema, 
¤t_batches)?;
   -current_batches.clear();
   -
   -// Update memory reservation.
   -self.reservation.try_shrink(current_size)?;
   -let merged_size = 
get_reserved_byte_for_record_batch(&merged);
   -self.reservation.try_grow(merged_size)?;
   -
   -merged_batches.push(merged);
   -current_size = 0;
   -}
   -
   -current_batches.push(batch);
   -current_size += batch_size;
   -}
   -
   -// Merge any remaining batches after the loop.
   -if !current_b

Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-15 Thread via GitHub


zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2804085563

   > #15375 (comment)
   
   I think i got it @Dandandan now,  it means we already have those in memory 
batch, we just need to first sort all elements' indices (2-level index consists 
of (batch_idx, row_idx)), we don't need to construct the StreamingMergeBuilder 
for in memory sort, we just need to sort it as a single sorting pass.
   
   Let me try this way, and compare the performance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-15 Thread via GitHub


Dandandan commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2804057064

   A more complete rationale / explanation of the same idea was written here by 
@2010YOUY01  
https://github.com/apache/datafusion/issues/15375#issuecomment-2747654525 
   
   > An alternative to try to avoid copies is: first sort all elements' indices 
(2-level index consists of (batch_idx, row_idx)), and get a permutation array.
   Use the interleave kernel to construct the final result 
https://docs.rs/arrow/latest/arrow/compute/kernels/interleave/fn.interleave.html
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-15 Thread via GitHub


Dandandan commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2804048977

   > But this PR, we also concat some batches into one batch, do you mean we 
can also use the indices from each batch to one batch just like the merge phase?
   
   I mean theoretically we don't have to `merge` as all the batches are in 
memory.
   The merging is useful for sorting streams of data, but I think it is 
expected the process of sorting batches first followed by a custom merge 
implementation is slower than one sorting pass based on rust std unstable sort 
(which is optimized for doing a minimal amount of comparisons quickly).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-14 Thread via GitHub


zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2803881895

   It seems when we merge the sorted batch, we already using the interleave to 
merge the sorted indices, here is the code:
   
   ```rust
   /// Drains the in_progress row indexes, and builds a new RecordBatch 
from them
   ///
   /// Will then drop any batches for which all rows have been yielded to 
the output
   ///
   /// Returns `None` if no pending rows
   pub fn build_record_batch(&mut self) -> Result> {
   if self.is_empty() {
   return Ok(None);
   }
   
   let columns = (0..self.schema.fields.len())
   .map(|column_idx| {
   let arrays: Vec<_> = self
   .batches
   .iter()
   .map(|(_, batch)| batch.column(column_idx).as_ref())
   .collect();
   Ok(interleave(&arrays, &self.indices)?)
   })
   .collect::>>()?;
   
   self.indices.clear();
   ```
   
   
   
   But this PR, we also concat some batches into one batch, do you mean we can 
also use the indices from each batch to one batch just like the merge phase?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-14 Thread via GitHub


Dandandan commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2803478057

   > > Thanks for sharing the results @zhuqi-lucas this is really interesting!
   > > I think it mainly shows that we probably should try and use more 
efficient in memory sorting (e.g. an arrow kernel that sorts multiple batches) 
here rather than use `SortPreservingMergeStream` which is intended to be used 
on data streams. The arrow kernel would avoid the regressions of `concat`.
   > 
   > I think the SortPreservingMergeStream is about as efficient as we know how 
to make it
   > 
   > Maybe we can look into what overhead makes concat'ing better 🤔 Any 
per-stream overhead we can improve in SortPreservingMergeStream would likely 
flow directly to any query that does sorts

   Perhaps SortPreservingMergeStream can be optimized for the cases all 
partitions are exhausted / all batches of all partitions are in memory, so it 
doesn't need to deal with the `cursor` complexity and can use Rust quicksort on 
the remaining data...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-14 Thread via GitHub


Dandandan commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2803501353

   Hm that doesn't make much sense as 
   
   > > Thanks for sharing the results @zhuqi-lucas this is really interesting!
   > > I think it mainly shows that we probably should try and use more 
efficient in memory sorting (e.g. an arrow kernel that sorts multiple batches) 
here rather than use `SortPreservingMergeStream` which is intended to be used 
on data streams. The arrow kernel would avoid the regressions of `concat`.
   > 
   > I think the SortPreservingMergeStream is about as efficient as we know how 
to make it
   > 
   > Maybe we can look into what overhead makes concat'ing better 🤔 Any 
per-stream overhead we can improve in SortPreservingMergeStream would likely 
flow directly to any query that does sorts
   
   Hm 🤔 ... but that will still take a separate step of sorting the input 
bathes, which next to sorting involves a full extra copy using `take` (slower 
than `concat`) followed by merging the batches. Also the built-in sort on the 
entire output is likely to be much faster than doing a merge on the outputs.
   
   I think the most efficient way would be to sort the indices to the arrays in 
one step followed by `interleave`, without either `concat` or `sort` followed 
by `merge` which would benefit the most from the built in sort algorithm and 
avoids copying the data.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-14 Thread via GitHub


alamb commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2802975607

   🤖 `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking) Running
   Linux aal-dev 6.8.0-1016-gcp #18-Ubuntu SMP Fri Oct  4 22:16:29 UTC 2024 
x86_64 x86_64 x86_64 GNU/Linux
   Comparing concat_batches_for_sort (6063bc572ca61fd71fd57382f94389ec99d9649f) 
to 0b01fdf7f02f9097c319204058576f420b9790b4 
[diff](https://github.com/apache/datafusion/compare/0b01fdf7f02f9097c319204058576f420b9790b4..6063bc572ca61fd71fd57382f94389ec99d9649f)
   Benchmarks: clickbench_1 clickbench_partitioned sort_tpch1
   Results will be posted here when complete
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-14 Thread via GitHub


alamb commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2802910163

   > Thanks for sharing the results @zhuqi-lucas this is really interesting!
   > 
   > I think it mainly shows that we probably should try and use more efficient 
in memory sorting (e.g. an arrow kernel that sorts multiple batches) here 
rather than use `SortPreservingMergeStream` which is intended to be used on 
data streams. The arrow kernel would avoid the regressions of `concat`.
   
   I think the SortPreservingMergeStream is about as efficient as we know how 
to make it
   
   Maybe we can look into what overhead makes concat'ing better 🤔  Any 
per-stream overhead we can improve in SortPreservingMergeStream would likely 
flow directly to any query that does sorts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-14 Thread via GitHub


alamb commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2802885322

   🤖 `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking) Running
   Linux aal-dev 6.8.0-1016-gcp #18-Ubuntu SMP Fri Oct  4 22:16:29 UTC 2024 
x86_64 x86_64 x86_64 GNU/Linux
   Comparing concat_batches_for_sort (6063bc572ca61fd71fd57382f94389ec99d9649f) 
to 0b01fdf7f02f9097c319204058576f420b9790b4 
[diff](https://github.com/apache/datafusion/compare/0b01fdf7f02f9097c319204058576f420b9790b4..6063bc572ca61fd71fd57382f94389ec99d9649f)
   Benchmarks: clickbench_1 clickbench_partitioned sort_tpch1
   Results will be posted here when complete
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-12 Thread via GitHub


Dandandan commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2798955035

   Thanks for sharing the results @zhuqi-lucas this is really interesting!
   
   I think it mainly shows that we probably should try and use more efficient 
in memory sorting (e.g. an arrow kernel that sorts multiple batches) here 
rather than use `SortPreservingMergeStream` which is intended to be used on 
data streams.
   The arrow kernel would avoid the regressions of `concat`.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-12 Thread via GitHub


zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2798879716

   Latest result based current latest code:
   
   ```rust
   
   Benchmark sort_tpch1.json
   
   ┏━━┳━━┳━┳━━━┓
   ┃ Query┃ main ┃ concat_batches_for_sort ┃Change ┃
   ┡━━╇━━╇━╇━━━┩
   │ Q1   │ 153.49ms │137.57ms │ +1.12x faster │
   │ Q2   │ 131.29ms │120.93ms │ +1.09x faster │
   │ Q3   │ 980.57ms │982.22ms │ no change │
   │ Q4   │ 252.25ms │245.09ms │ no change │
   │ Q5   │ 464.81ms │449.27ms │ no change │
   │ Q6   │ 481.44ms │455.45ms │ +1.06x faster │
   │ Q7   │ 810.73ms │709.74ms │ +1.14x faster │
   │ Q8   │ 498.10ms │491.12ms │ no change │
   │ Q9   │ 503.80ms │510.20ms │ no change │
   │ Q10  │ 789.02ms │706.45ms │ +1.12x faster │
   │ Q11  │ 417.39ms │411.50ms │ no change │
   └──┴──┴─┴───┘
   ┏┳━━━┓
   ┃ Benchmark Summary  ┃   ┃
   ┡╇━━━┩
   │ Total Time (main)  │ 5482.89ms │
   │ Total Time (concat_batches_for_sort)   │ 5219.53ms │
   │ Average Time (main)│  498.44ms │
   │ Average Time (concat_batches_for_sort) │  474.50ms │
   │ Queries Faster │ 5 │
   │ Queries Slower │ 0 │
   │ Queries with No Change │ 6 │
   └┴───┘
   
   Benchmark sort_tpch10.json
   
   ┏━━┳┳━┳━━━┓
   ┃ Query┃   main ┃ concat_batches_for_sort ┃Change ┃
   ┡━━╇╇━╇━━━┩
   │ Q1   │  2243.52ms │   1825.64ms │ +1.23x faster │
   │ Q2   │  1842.11ms │   1639.00ms │ +1.12x faster │
   │ Q3   │ 12446.31ms │  11981.63ms │ no change │
   │ Q4   │  4047.55ms │   3715.96ms │ +1.09x faster │
   │ Q5   │  4364.46ms │   4503.51ms │ no change │
   │ Q6   │  4561.01ms │   4688.31ms │ no change │
   │ Q7   │  8158.01ms │   7915.54ms │ no change │
   │ Q8   │  6077.40ms │   5524.08ms │ +1.10x faster │
   │ Q9   │  6347.21ms │   5853.44ms │ +1.08x faster │
   │ Q10  │ 11561.03ms │  14235.69ms │  1.23x slower │
   │ Q11  │  6069.42ms │   5666.77ms │ +1.07x faster │
   └──┴┴─┴───┘
   ┏┳┓
   ┃ Benchmark Summary  ┃┃
   ┡╇┩
   │ Total Time (main)  │ 67718.04ms │
   │ Total Time (concat_batches_for_sort)   │ 67549.58ms │
   │ Average Time (main)│  6156.19ms │
   │ Average Time (concat_batches_for_sort) │  6140.87ms │
   │ Queries Faster │  6 │
   │ Queries Slower │  1 │
   │ Queries with No Change │  4 │
   └┴┘
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-12 Thread via GitHub


zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2798868399

   @alamb Do we have the CI benchmark running now? If no, i need your help to 
run... Thanks a lot!
   
   And also for the sort-tpch itself, i was running for the improvement result, 
but not for other benchmark running.
   
   Previous sort-tpch:
   ```rust
   ┏━━┳┳━┳━━━┓
   ┃ Query┃   main ┃ concat_batches_for_sort ┃Change ┃
   ┡━━╇╇━╇━━━┩
   │ Q1   │  2241.04ms │   1816.69ms │ +1.23x faster │
   │ Q2   │  1841.01ms │   1496.73ms │ +1.23x faster │
   │ Q3   │ 12755.85ms │  12770.18ms │ no change │
   │ Q4   │  4433.49ms │   3278.70ms │ +1.35x faster │
   │ Q5   │  4414.15ms │   4409.04ms │ no change │
   │ Q6   │  4543.09ms │   4597.32ms │ no change │
   │ Q7   │  8012.85ms │   9026.30ms │  1.13x slower │
   │ Q8   │  6572.37ms │   6049.51ms │ +1.09x faster │
   │ Q9   │  6734.63ms │   6345.69ms │ +1.06x faster │
   │ Q10  │  9896.16ms │   9564.17ms │ no change │
   └──┴┴─┴───┘
   ┏┳┓
   ┃ Benchmark Summary  ┃┃
   ┡╇┩
   │ Total Time (main)  │ 61444.64ms │
   │ Total Time (concat_batches_for_sort)   │ 59354.33ms │
   │ Average Time (main)│  6144.46ms │
   │ Average Time (concat_batches_for_sort) │  5935.43ms │
   │ Queries Faster │  5 │
   │ Queries Slower │  1 │
   │ Queries with No Change │  4 │
   └┴┘
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]

2025-04-12 Thread via GitHub


zhuqi-lucas commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2040673419


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -645,7 +645,36 @@ impl ExternalSorter {
 return self.sort_batch_stream(batch, metrics, reservation);
 }
 
-let streams = std::mem::take(&mut self.in_mem_batches)
+let mut merged_batches = Vec::new();
+let mut current_batches = Vec::new();
+let mut current_size = 0;
+
+for batch in std::mem::take(&mut self.in_mem_batches) {

Review Comment:
   > I think it would be nice to use `pop` (`while let Some(batch) = v.pop`) 
here to remove the batch from the vec once sorted to reduce memory usage. Now 
the batch is AFAIK retained until after the loop.
   
   Thank you @Dandandan for review and good suggestion, addressed your 
suggestion!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]