Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2049125883
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -662,53 +665,152 @@ impl ExternalSorter {
let elapsed_compute = metrics.elapsed_compute().clone();
let _timer = elapsed_compute.timer();
-// Please pay attention that any operation inside of
`in_mem_sort_stream` will
-// not perform any memory reservation. This is for avoiding the need
of handling
-// reservation failure and spilling in the middle of the sort/merge.
The memory
-// space for batches produced by the resulting stream will be reserved
by the
-// consumer of the stream.
+// Note, in theory in memory batches should have limited size, but
some testing
+// cases testing the memory limit use `sort_in_place_threshold_bytes`
to, so here we
+// set a larger limit to avoid testing failure.
+if self.expr.len() <= 2
+&& self.reservation.size() < 1000 *
self.sort_in_place_threshold_bytes
+{
+let interleave_indices = self.build_sorted_indices(
+self.in_mem_batches.as_slice(),
+Arc::clone(&self.expr),
+)?;
-if self.in_mem_batches.len() == 1 {
-let batch = self.in_mem_batches.swap_remove(0);
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
-}
+let batches: Vec<&RecordBatch> =
self.in_mem_batches.iter().collect();
+let sorted_batch = interleave_record_batch(&batches,
&interleave_indices)?;
-// If less than sort_in_place_threshold_bytes, concatenate and sort in
place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
self.in_mem_batches.clear();
self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
+.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
.map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+
+metrics.record_output(sorted_batch.num_rows());
+
+Ok(Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(async { Ok(sorted_batch) }),
+)) as SendableRecordBatchStream)
+} else {
+// Please pay attention that any operation inside of
`in_mem_sort_stream` will
+// not perform any memory reservation. This is for avoiding the
need of handling
+// reservation failure and spilling in the middle of the
sort/merge. The memory
+// space for batches produced by the resulting stream will be
reserved by the
+// consumer of the stream.
+
+if self.in_mem_batches.len() == 1 {
+let batch = self.in_mem_batches.swap_remove(0);
+let reservation = self.reservation.take();
+return self.sort_batch_stream(batch, metrics, reservation);
+}
+
+// If less than sort_in_place_threshold_bytes, concatenate and
sort in place
+if self.reservation.size() < self.sort_in_place_threshold_bytes {
+// Concatenate memory batches together and sort
Review Comment:
Thank you for the feedback @Dandandan,
Now the logic is:
1. self.expr.len() <= 2, we merge all the memory batch into one batch using
interleave , and don't send to merge streaming
2. Other cases keep original cases.
Because, i can't see performance gain for interleave after the sort column
>2 from the benchmark result, so i don't change the original concatenate here.
And more challenges now are the testing fails and memory model. Because we
return one single batch for the above case1, so it will not have partial merge
for many cases, only the final merge, so the memory model will fail for some
cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2049125883
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -662,53 +665,152 @@ impl ExternalSorter {
let elapsed_compute = metrics.elapsed_compute().clone();
let _timer = elapsed_compute.timer();
-// Please pay attention that any operation inside of
`in_mem_sort_stream` will
-// not perform any memory reservation. This is for avoiding the need
of handling
-// reservation failure and spilling in the middle of the sort/merge.
The memory
-// space for batches produced by the resulting stream will be reserved
by the
-// consumer of the stream.
+// Note, in theory in memory batches should have limited size, but
some testing
+// cases testing the memory limit use `sort_in_place_threshold_bytes`
to, so here we
+// set a larger limit to avoid testing failure.
+if self.expr.len() <= 2
+&& self.reservation.size() < 1000 *
self.sort_in_place_threshold_bytes
+{
+let interleave_indices = self.build_sorted_indices(
+self.in_mem_batches.as_slice(),
+Arc::clone(&self.expr),
+)?;
-if self.in_mem_batches.len() == 1 {
-let batch = self.in_mem_batches.swap_remove(0);
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
-}
+let batches: Vec<&RecordBatch> =
self.in_mem_batches.iter().collect();
+let sorted_batch = interleave_record_batch(&batches,
&interleave_indices)?;
-// If less than sort_in_place_threshold_bytes, concatenate and sort in
place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
self.in_mem_batches.clear();
self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
+.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
.map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+
+metrics.record_output(sorted_batch.num_rows());
+
+Ok(Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(async { Ok(sorted_batch) }),
+)) as SendableRecordBatchStream)
+} else {
+// Please pay attention that any operation inside of
`in_mem_sort_stream` will
+// not perform any memory reservation. This is for avoiding the
need of handling
+// reservation failure and spilling in the middle of the
sort/merge. The memory
+// space for batches produced by the resulting stream will be
reserved by the
+// consumer of the stream.
+
+if self.in_mem_batches.len() == 1 {
+let batch = self.in_mem_batches.swap_remove(0);
+let reservation = self.reservation.take();
+return self.sort_batch_stream(batch, metrics, reservation);
+}
+
+// If less than sort_in_place_threshold_bytes, concatenate and
sort in place
+if self.reservation.size() < self.sort_in_place_threshold_bytes {
+// Concatenate memory batches together and sort
Review Comment:
Thank you for the feedback @Dandandan,
Now the logic is:
1. self.expr.len() <= 2, we merge all the memory batch into one batch using
interleave , and don't send to merge streaming
2. Other cases keep original cases.
Because, i can't see performance gain for interleave after the sort column
>2 from the benchmark result, so i don't change the original concatenate here.
And more challenges now are the testing fails and memory model. Because we
return one single batch for the above case1, so it will not have partial merge
for many cases, only the final merge, so the memory model will fail for some
testing cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2049125883
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -662,53 +665,152 @@ impl ExternalSorter {
let elapsed_compute = metrics.elapsed_compute().clone();
let _timer = elapsed_compute.timer();
-// Please pay attention that any operation inside of
`in_mem_sort_stream` will
-// not perform any memory reservation. This is for avoiding the need
of handling
-// reservation failure and spilling in the middle of the sort/merge.
The memory
-// space for batches produced by the resulting stream will be reserved
by the
-// consumer of the stream.
+// Note, in theory in memory batches should have limited size, but
some testing
+// cases testing the memory limit use `sort_in_place_threshold_bytes`
to, so here we
+// set a larger limit to avoid testing failure.
+if self.expr.len() <= 2
+&& self.reservation.size() < 1000 *
self.sort_in_place_threshold_bytes
+{
+let interleave_indices = self.build_sorted_indices(
+self.in_mem_batches.as_slice(),
+Arc::clone(&self.expr),
+)?;
-if self.in_mem_batches.len() == 1 {
-let batch = self.in_mem_batches.swap_remove(0);
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
-}
+let batches: Vec<&RecordBatch> =
self.in_mem_batches.iter().collect();
+let sorted_batch = interleave_record_batch(&batches,
&interleave_indices)?;
-// If less than sort_in_place_threshold_bytes, concatenate and sort in
place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
self.in_mem_batches.clear();
self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
+.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
.map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+
+metrics.record_output(sorted_batch.num_rows());
+
+Ok(Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(async { Ok(sorted_batch) }),
+)) as SendableRecordBatchStream)
+} else {
+// Please pay attention that any operation inside of
`in_mem_sort_stream` will
+// not perform any memory reservation. This is for avoiding the
need of handling
+// reservation failure and spilling in the middle of the
sort/merge. The memory
+// space for batches produced by the resulting stream will be
reserved by the
+// consumer of the stream.
+
+if self.in_mem_batches.len() == 1 {
+let batch = self.in_mem_batches.swap_remove(0);
+let reservation = self.reservation.take();
+return self.sort_batch_stream(batch, metrics, reservation);
+}
+
+// If less than sort_in_place_threshold_bytes, concatenate and
sort in place
+if self.reservation.size() < self.sort_in_place_threshold_bytes {
+// Concatenate memory batches together and sort
Review Comment:
Thank you for the feedback @Dandandan,
Now the logic is:
1. self.expr.len() <= 2, we merge all the memory batch into one batch using
interleave , and don't send to merge streaming
2. Other cases keep original cases.
Because, i can't see performance gain for interleave after setting to when
the sort column >2 from the benchmark result, so i don't change the original
concatenate here.
And more challenges now are the testing fails and memory model. Because we
return one single batch for the above case1, so it will not have partial merge
for many cases, only the final merge, so the memory model will fail for some
cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2049170187
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -662,53 +665,152 @@ impl ExternalSorter {
let elapsed_compute = metrics.elapsed_compute().clone();
let _timer = elapsed_compute.timer();
-// Please pay attention that any operation inside of
`in_mem_sort_stream` will
-// not perform any memory reservation. This is for avoiding the need
of handling
-// reservation failure and spilling in the middle of the sort/merge.
The memory
-// space for batches produced by the resulting stream will be reserved
by the
-// consumer of the stream.
+// Note, in theory in memory batches should have limited size, but
some testing
+// cases testing the memory limit use `sort_in_place_threshold_bytes`
to, so here we
+// set a larger limit to avoid testing failure.
+if self.expr.len() <= 2
+&& self.reservation.size() < 1000 *
self.sort_in_place_threshold_bytes
+{
+let interleave_indices = self.build_sorted_indices(
+self.in_mem_batches.as_slice(),
+Arc::clone(&self.expr),
+)?;
-if self.in_mem_batches.len() == 1 {
-let batch = self.in_mem_batches.swap_remove(0);
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
-}
+let batches: Vec<&RecordBatch> =
self.in_mem_batches.iter().collect();
+let sorted_batch = interleave_record_batch(&batches,
&interleave_indices)?;
-// If less than sort_in_place_threshold_bytes, concatenate and sort in
place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
self.in_mem_batches.clear();
self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
+.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
.map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+
+metrics.record_output(sorted_batch.num_rows());
+
+Ok(Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(async { Ok(sorted_batch) }),
+)) as SendableRecordBatchStream)
+} else {
+// Please pay attention that any operation inside of
`in_mem_sort_stream` will
+// not perform any memory reservation. This is for avoiding the
need of handling
+// reservation failure and spilling in the middle of the
sort/merge. The memory
+// space for batches produced by the resulting stream will be
reserved by the
+// consumer of the stream.
+
+if self.in_mem_batches.len() == 1 {
+let batch = self.in_mem_batches.swap_remove(0);
+let reservation = self.reservation.take();
+return self.sort_batch_stream(batch, metrics, reservation);
+}
+
+// If less than sort_in_place_threshold_bytes, concatenate and
sort in place
+if self.reservation.size() < self.sort_in_place_threshold_bytes {
+// Concatenate memory batches together and sort
Review Comment:
And, i am not sure why i can't see performance gain for interleave after the
sort column >2 from the benchmark result?
Also need investigation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2049125883
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -662,53 +665,152 @@ impl ExternalSorter {
let elapsed_compute = metrics.elapsed_compute().clone();
let _timer = elapsed_compute.timer();
-// Please pay attention that any operation inside of
`in_mem_sort_stream` will
-// not perform any memory reservation. This is for avoiding the need
of handling
-// reservation failure and spilling in the middle of the sort/merge.
The memory
-// space for batches produced by the resulting stream will be reserved
by the
-// consumer of the stream.
+// Note, in theory in memory batches should have limited size, but
some testing
+// cases testing the memory limit use `sort_in_place_threshold_bytes`
to, so here we
+// set a larger limit to avoid testing failure.
+if self.expr.len() <= 2
+&& self.reservation.size() < 1000 *
self.sort_in_place_threshold_bytes
+{
+let interleave_indices = self.build_sorted_indices(
+self.in_mem_batches.as_slice(),
+Arc::clone(&self.expr),
+)?;
-if self.in_mem_batches.len() == 1 {
-let batch = self.in_mem_batches.swap_remove(0);
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
-}
+let batches: Vec<&RecordBatch> =
self.in_mem_batches.iter().collect();
+let sorted_batch = interleave_record_batch(&batches,
&interleave_indices)?;
-// If less than sort_in_place_threshold_bytes, concatenate and sort in
place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
self.in_mem_batches.clear();
self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
+.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
.map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+
+metrics.record_output(sorted_batch.num_rows());
+
+Ok(Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(async { Ok(sorted_batch) }),
+)) as SendableRecordBatchStream)
+} else {
+// Please pay attention that any operation inside of
`in_mem_sort_stream` will
+// not perform any memory reservation. This is for avoiding the
need of handling
+// reservation failure and spilling in the middle of the
sort/merge. The memory
+// space for batches produced by the resulting stream will be
reserved by the
+// consumer of the stream.
+
+if self.in_mem_batches.len() == 1 {
+let batch = self.in_mem_batches.swap_remove(0);
+let reservation = self.reservation.take();
+return self.sort_batch_stream(batch, metrics, reservation);
+}
+
+// If less than sort_in_place_threshold_bytes, concatenate and
sort in place
+if self.reservation.size() < self.sort_in_place_threshold_bytes {
+// Concatenate memory batches together and sort
Review Comment:
Thank you for the feedback @Dandandan,
Now the logic is:
1. self.expr.len() <= 2, we merge all the memory batch into one batch, and
don't send to merge streaming
2. Other cases keep original cases.
Because, i can't see performance gain for interleave after setting to when
the sort column >2 from the benchmark result, so i don't change the original
concatenate here.
And more challenges now are the testing fails and memory model. Because we
return one single batch for the above case1, so it will not have partial merge
for many cases, only the final merge, so the memory model will fail for some
cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
Dandandan commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2049110693
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -662,53 +665,152 @@ impl ExternalSorter {
let elapsed_compute = metrics.elapsed_compute().clone();
let _timer = elapsed_compute.timer();
-// Please pay attention that any operation inside of
`in_mem_sort_stream` will
-// not perform any memory reservation. This is for avoiding the need
of handling
-// reservation failure and spilling in the middle of the sort/merge.
The memory
-// space for batches produced by the resulting stream will be reserved
by the
-// consumer of the stream.
+// Note, in theory in memory batches should have limited size, but
some testing
+// cases testing the memory limit use `sort_in_place_threshold_bytes`
to, so here we
+// set a larger limit to avoid testing failure.
+if self.expr.len() <= 2
+&& self.reservation.size() < 1000 *
self.sort_in_place_threshold_bytes
+{
+let interleave_indices = self.build_sorted_indices(
+self.in_mem_batches.as_slice(),
+Arc::clone(&self.expr),
+)?;
-if self.in_mem_batches.len() == 1 {
-let batch = self.in_mem_batches.swap_remove(0);
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
-}
+let batches: Vec<&RecordBatch> =
self.in_mem_batches.iter().collect();
+let sorted_batch = interleave_record_batch(&batches,
&interleave_indices)?;
-// If less than sort_in_place_threshold_bytes, concatenate and sort in
place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
self.in_mem_batches.clear();
self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
+.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
.map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+
+metrics.record_output(sorted_batch.num_rows());
+
+Ok(Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(async { Ok(sorted_batch) }),
+)) as SendableRecordBatchStream)
+} else {
+// Please pay attention that any operation inside of
`in_mem_sort_stream` will
+// not perform any memory reservation. This is for avoiding the
need of handling
+// reservation failure and spilling in the middle of the
sort/merge. The memory
+// space for batches produced by the resulting stream will be
reserved by the
+// consumer of the stream.
+
+if self.in_mem_batches.len() == 1 {
+let batch = self.in_mem_batches.swap_remove(0);
+let reservation = self.reservation.take();
+return self.sort_batch_stream(batch, metrics, reservation);
+}
+
+// If less than sort_in_place_threshold_bytes, concatenate and
sort in place
+if self.reservation.size() < self.sort_in_place_threshold_bytes {
+// Concatenate memory batches together and sort
Review Comment:
Hmm... also for this case I think concat only sort arrays instead of batches
will generally be an optimization (probably as long as there are arrays besides
the arrays-to-sort).
Maybe we can combine `self.expr.len() <= 2` and `self.reservation.size() <
self.sort_in_place_threshold_bytes` into one?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
Dandandan commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2048727270
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -667,35 +668,81 @@ impl ExternalSorter {
// space for batches produced by the resulting stream will be reserved
by the
// consumer of the stream.
-if self.in_mem_batches.len() == 1 {
-let batch = self.in_mem_batches.swap_remove(0);
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
-}
+let mut merged_batches = Vec::new();
+let mut current_batches = Vec::new();
+let mut current_size = 0;
// If less than sort_in_place_threshold_bytes, concatenate and sort in
place
if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
+let interleave_indices = self.build_sorted_indices(
+self.in_mem_batches.as_slice(),
+Arc::clone(&self.expr),
+)?;
+
+let batches: Vec<&RecordBatch> =
self.in_mem_batches.iter().collect();
+let sorted_batch = interleave_record_batch(&batches,
&interleave_indices)?;
+
self.in_mem_batches.clear();
self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
+.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
.map_err(Self::err_with_oom_context)?;
+
+metrics.record_output(sorted_batch.num_columns());
let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+drop(reservation);
+
+return Ok(Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(async { Ok(sorted_batch) }),
+)) as SendableRecordBatchStream);
+}
+
+let in_mem_batches = std::mem::take(&mut self.in_mem_batches);
+// Note:
+// Now we use `sort_in_place_threshold_bytes` to determine, in future
we can make it more dynamic.
+for batch in &in_mem_batches {
+let batch_size = get_reserved_byte_for_record_batch(batch);
+
+// If adding this batch would exceed the memory threshold, merge
current_batches.
+if current_size + batch_size > self.sort_in_place_threshold_bytes
+&& !current_batches.is_empty()
+{
+self.merge_and_push_sorted_batch(
+&mut current_batches,
+&mut current_size,
+&mut merged_batches,
+)?;
+current_size = 0;
+}
+
+current_batches.push(batch.clone());
Review Comment:
I don't fully get the current code, why does it push a batch here to be used
in the next call again?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2812614902 Also no regression for small data set: ```rust Benchmark sort_tpch1.json ┏━━┳━━┳━┳━━━┓ ┃ Query┃ main ┃ concat_batches_for_sort ┃Change ┃ ┡━━╇━━╇━╇━━━┩ │ Q1 │ 153.49ms │144.12ms │ +1.07x faster │ │ Q2 │ 131.29ms │117.87ms │ +1.11x faster │ │ Q3 │ 980.57ms │986.92ms │ no change │ │ Q4 │ 252.25ms │205.13ms │ +1.23x faster │ │ Q5 │ 464.81ms │464.38ms │ no change │ │ Q6 │ 481.44ms │474.70ms │ no change │ │ Q7 │ 810.73ms │705.06ms │ +1.15x faster │ │ Q8 │ 498.10ms │503.96ms │ no change │ │ Q9 │ 503.80ms │530.64ms │ 1.05x slower │ │ Q10 │ 789.02ms │705.37ms │ +1.12x faster │ │ Q11 │ 417.39ms │438.07ms │ no change │ └──┴──┴─┴───┘ ┏┳━━━┓ ┃ Benchmark Summary ┃ ┃ ┡╇━━━┩ │ Total Time (main) │ 5482.89ms │ │ Total Time (concat_batches_for_sort) │ 5276.20ms │ │ Average Time (main)│ 498.44ms │ │ Average Time (concat_batches_for_sort) │ 479.65ms │ │ Queries Faster │ 5 │ │ Queries Slower │ 1 │ │ Queries with No Change │ 5 │ └┴───┘ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2812607514 ```rust Benchmark sort_tpch10.json ┏━━┳┳━┳━━━┓ ┃ Query┃ main ┃ concat_batches_for_sort ┃Change ┃ ┡━━╇╇━╇━━━┩ │ Q1 │ 2243.52ms │ 1525.77ms │ +1.47x faster │ │ Q2 │ 1842.11ms │ 1150.71ms │ +1.60x faster │ │ Q3 │ 12446.31ms │ 12498.09ms │ no change │ │ Q4 │ 4047.55ms │ 1949.27ms │ +2.08x faster │ │ Q5 │ 4364.46ms │ 4334.64ms │ no change │ │ Q6 │ 4561.01ms │ 4538.98ms │ no change │ │ Q7 │ 8158.01ms │ 7720.56ms │ +1.06x faster │ │ Q8 │ 6077.40ms │ 6010.15ms │ no change │ │ Q9 │ 6347.21ms │ 6406.01ms │ no change │ │ Q10 │ 11561.03ms │ 9081.48ms │ +1.27x faster │ │ Q11 │ 6069.42ms │ 4684.90ms │ +1.30x faster │ └──┴┴─┴───┘ ┏┳┓ ┃ Benchmark Summary ┃┃ ┡╇┩ │ Total Time (main) │ 67718.04ms │ │ Total Time (concat_batches_for_sort) │ 59900.57ms │ │ Average Time (main)│ 6156.19ms │ │ Average Time (concat_batches_for_sort) │ 5445.51ms │ │ Queries Faster │ 6 │ │ Queries Slower │ 0 │ │ Queries with No Change │ 5 │ └┴┘ ``` Updated the latest code result, when column <=2 , we will do it in memory sort and using interleave. It's no regression for the testing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2812565082
> So to change it in your diff (didn't change the documentation).
>
> I would like to keep the original `StreamingMergeBuilder` case and the `if
self.reservation.size() < self.sort_in_place_threshold_bytes` expression so
that we only have the "avoid concat for non-sort columns" optimization in place
and see if this improves on all sort queries.
>
> ```diff
> -// If less than sort_in_place_threshold_bytes, concatenate and
sort in place
> -if self.reservation.size() < self.sort_in_place_threshold_bytes {
> -// Concatenate memory batches together and sort
> -let batch = concat_batches(&self.schema,
&self.in_mem_batches)?;
> -self.in_mem_batches.clear();
> -self.reservation
> -.try_resize(get_reserved_byte_for_record_batch(&batch))
> -.map_err(Self::err_with_oom_context)?;
> -let reservation = self.reservation.take();
> -return self.sort_batch_stream(batch, metrics, reservation);
> +let mut columns_by_expr: Vec> = vec![vec![];
self.expr.len()];
> +for batch in &self.in_mem_batches {
> +for (i, expr) in self.expr.iter().enumerate() {
> +let col = expr.evaluate_to_sort_column(batch)?.values;
> +columns_by_expr[i].push(col);
> +}
> }
>
> -let streams = std::mem::take(&mut self.in_mem_batches)
> -.into_iter()
> -.map(|batch| {
> -let metrics = self.metrics.baseline.intermediate();
> -let reservation = self
> -.reservation
> -.split(get_reserved_byte_for_record_batch(&batch));
> -let input = self.sort_batch_stream(batch, metrics,
reservation)?;
> -Ok(spawn_buffered(input, 1))
> -})
> -.collect::>()?;
> +// For each sort expression, concatenate arrays from all batches
into one global array
> +let mut sort_columns = Vec::with_capacity(self.expr.len());
> +for (arrays, expr) in
columns_by_expr.into_iter().zip(self.expr.iter()) {
> +let array = concat(
> +&arrays
> +.iter()
> +.map(|a| a.as_ref())
> +.collect::>(),
> +)?;
> +sort_columns.push(SortColumn {
> +values: array,
> +options: expr.options.into(),
> +});
> +}
>
> -let expressions: LexOrdering =
self.expr.iter().cloned().collect();
> +// = Phase 2: Compute global sorted indices =
> +// Use `lexsort_to_indices` to get global row indices in sorted
order (as if all batches were concatenated)
> +let indices = lexsort_to_indices(&sort_columns, None)?;
>
> -StreamingMergeBuilder::new()
> -.with_streams(streams)
> -.with_schema(Arc::clone(&self.schema))
> -.with_expressions(expressions.as_ref())
> -.with_metrics(metrics)
> -.with_batch_size(self.batch_size)
> -.with_fetch(None)
> -.with_reservation(self.merge_reservation.new_empty())
> -.build()
> +// = Phase 3: Reorder each column using the global sorted
indices =
> +let num_columns = self.schema.fields().len();
> +
> +let batch_indices: Vec<(usize, usize)> = self
> +.in_mem_batches
> +.iter()
> +.enumerate()
> +.map(|(batch_id, batch)| (0..batch.num_rows()).map(move |i|
(batch_id, i)))
> +.flatten()
> +.collect();
> +
> +// For each column:
> +// 1. Concatenate all batch arrays for this column (in the same
order as assumed by `lexsort_to_indices`)
> +// 2. Use Arrow's `take` function to reorder the column by sorted
indices
> +let interleave_indices: Vec<(usize, usize)> = indices
> +.values()
> +.iter()
> +.map(|x| batch_indices[*x as usize])
> +.collect();
> +// Build a RecordBatch from the sorted columns
> +
> +let batches: Vec<&RecordBatch> =
self.in_mem_batches.iter().collect();
> +
> +let sorted_batch =
> +interleave_record_batch(batches.as_ref(),
&interleave_indices)?;
> +// Clear in-memory batches and update reservation
> +self.in_mem_batches.clear();
> +self.reservation
> +
.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))?;
> +let reservation = s
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
Dandandan commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2812523777 > > > Tested latest PR, it seems regression again for current code... > > > > > > Hmm... could it be because of the increased threshold? I think if we're strictly faster and keep the same heuristic we shouldn't have regressions? > > I did not change the threshold, i found it's not stable for the result, but it's will 100% improve for the sort by one column case. But for mulit column sorting, it will not improve a lot from current code. I meant > pub sort_in_place_threshold_bytes: usize, default = 4 * 1024 * 1024 seems to be changed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2812539191 > > > > Tested latest PR, it seems regression again for current code... > > > > > > > > > Hmm... could it be because of the increased threshold? I think if we're strictly faster and keep the same heuristic we shouldn't have regressions? > > > > > > I did not change the threshold, i found it's not stable for the result, but it's will 100% improve for the sort by one column case. But for mulit column sorting, it will not improve a lot from current code. > > I meant > > > pub sort_in_place_threshold_bytes: usize, default = 4 * 1024 * 1024 > > seems to be changed? https://github.com/apache/datafusion/pull/15380#issuecomment-2809956328 Above one, this also tuning to use 4 * 1024 * 1024, so i set it to 4 * 1024 * 1024, if i change to original value, the performance worse. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2048731630
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -667,35 +668,81 @@ impl ExternalSorter {
// space for batches produced by the resulting stream will be reserved
by the
// consumer of the stream.
-if self.in_mem_batches.len() == 1 {
-let batch = self.in_mem_batches.swap_remove(0);
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
-}
+let mut merged_batches = Vec::new();
+let mut current_batches = Vec::new();
+let mut current_size = 0;
// If less than sort_in_place_threshold_bytes, concatenate and sort in
place
if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
+let interleave_indices = self.build_sorted_indices(
+self.in_mem_batches.as_slice(),
+Arc::clone(&self.expr),
+)?;
+
+let batches: Vec<&RecordBatch> =
self.in_mem_batches.iter().collect();
+let sorted_batch = interleave_record_batch(&batches,
&interleave_indices)?;
+
self.in_mem_batches.clear();
self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
+.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
.map_err(Self::err_with_oom_context)?;
+
+metrics.record_output(sorted_batch.num_columns());
let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+drop(reservation);
+
+return Ok(Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(async { Ok(sorted_batch) }),
+)) as SendableRecordBatchStream);
+}
+
+let in_mem_batches = std::mem::take(&mut self.in_mem_batches);
+// Note:
+// Now we use `sort_in_place_threshold_bytes` to determine, in future
we can make it more dynamic.
+for batch in &in_mem_batches {
+let batch_size = get_reserved_byte_for_record_batch(batch);
+
+// If adding this batch would exceed the memory threshold, merge
current_batches.
+if current_size + batch_size > self.sort_in_place_threshold_bytes
+&& !current_batches.is_empty()
+{
+self.merge_and_push_sorted_batch(
+&mut current_batches,
+&mut current_size,
+&mut merged_batches,
+)?;
+current_size = 0;
+}
+
+current_batches.push(batch.clone());
Review Comment:
We merge for all batches for each sort_in_place_threshold_bytes. So it's a
loop for merge.
If we only do for < sort_in_place_threshold_bytes, i can't see too much
improvement.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2048731630
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -667,35 +668,81 @@ impl ExternalSorter {
// space for batches produced by the resulting stream will be reserved
by the
// consumer of the stream.
-if self.in_mem_batches.len() == 1 {
-let batch = self.in_mem_batches.swap_remove(0);
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
-}
+let mut merged_batches = Vec::new();
+let mut current_batches = Vec::new();
+let mut current_size = 0;
// If less than sort_in_place_threshold_bytes, concatenate and sort in
place
if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
+let interleave_indices = self.build_sorted_indices(
+self.in_mem_batches.as_slice(),
+Arc::clone(&self.expr),
+)?;
+
+let batches: Vec<&RecordBatch> =
self.in_mem_batches.iter().collect();
+let sorted_batch = interleave_record_batch(&batches,
&interleave_indices)?;
+
self.in_mem_batches.clear();
self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
+.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
.map_err(Self::err_with_oom_context)?;
+
+metrics.record_output(sorted_batch.num_columns());
let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+drop(reservation);
+
+return Ok(Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(async { Ok(sorted_batch) }),
+)) as SendableRecordBatchStream);
+}
+
+let in_mem_batches = std::mem::take(&mut self.in_mem_batches);
+// Note:
+// Now we use `sort_in_place_threshold_bytes` to determine, in future
we can make it more dynamic.
+for batch in &in_mem_batches {
+let batch_size = get_reserved_byte_for_record_batch(batch);
+
+// If adding this batch would exceed the memory threshold, merge
current_batches.
+if current_size + batch_size > self.sort_in_place_threshold_bytes
+&& !current_batches.is_empty()
+{
+self.merge_and_push_sorted_batch(
+&mut current_batches,
+&mut current_size,
+&mut merged_batches,
+)?;
+current_size = 0;
+}
+
+current_batches.push(batch.clone());
Review Comment:
We merge for all batches for each sort_in_place_threshold_bytes. So it's a
loop for merge.
If we only do for < sort_in_place_threshold_bytes once, i can't see too much
improvement.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2812518167 I think we can do a specific optimization for single column sort, i will update the code soon. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2812444262 > > Tested latest PR, it seems regression again for current code... > > Hmm... could it be because of the increased threshold? I think if we're strictly faster and keep the same heuristic we shouldn't have regressions? I did not change the threshold, i found it's not stable for the result, but it's will 100% improve for the sort by one column case. But for mulit column sorting, it will not increase a lot from current code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
Dandandan commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2812341624 > Tested latest PR, it seems regression again for current code... Hmm... could it be because of the increased threshold? I think if we're strictly faster and keep the same heuristic we shouldn't have regressions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2812330239 Tested latest PR, it seems regression again for current code... ```rust Benchmark sort_tpch10.json ┏━━┳┳━┳━━━┓ ┃ Query┃ main ┃ concat_batches_for_sort ┃Change ┃ ┡━━╇╇━╇━━━┩ │ Q1 │ 2243.52ms │ 1655.67ms │ +1.36x faster │ │ Q2 │ 1842.11ms │ 1350.11ms │ +1.36x faster │ │ Q3 │ 12446.31ms │ 12363.56ms │ no change │ │ Q4 │ 4047.55ms │ 2873.84ms │ +1.41x faster │ │ Q5 │ 4364.46ms │ 4644.86ms │ 1.06x slower │ │ Q6 │ 4561.01ms │ 4769.54ms │ no change │ │ Q7 │ 8158.01ms │ 10058.76ms │ 1.23x slower │ │ Q8 │ 6077.40ms │ 6947.77ms │ 1.14x slower │ │ Q9 │ 6347.21ms │ 6379.49ms │ no change │ │ Q10 │ 11561.03ms │ 15190.91ms │ 1.31x slower │ │ Q11 │ 6069.42ms │ 8268.64ms │ 1.36x slower │ └──┴┴─┴───┘ ┏┳┓ ┃ Benchmark Summary ┃┃ ┡╇┩ │ Total Time (main) │ 67718.04ms │ │ Total Time (concat_batches_for_sort) │ 74503.14ms │ │ Average Time (main)│ 6156.19ms │ │ Average Time (concat_batches_for_sort) │ 6773.01ms │ │ Queries Faster │ 3 │ │ Queries Slower │ 5 │ │ Queries with No Change │ 3 │ └┴┘ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2048456865
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -667,35 +669,57 @@ impl ExternalSorter {
// space for batches produced by the resulting stream will be reserved
by the
// consumer of the stream.
-if self.in_mem_batches.len() == 1 {
-let batch = self.in_mem_batches.swap_remove(0);
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+let mut merged_batches = Vec::new();
+let mut current_batches = Vec::new();
+let mut current_size = 0;
+
+
+let in_mem_batches = std::mem::take(&mut self.in_mem_batches);
+// Note:
+// Now we use `sort_in_place_threshold_bytes` to determine, in future
we can make it more dynamic.
+for batch in &in_mem_batches {
+let batch_size = get_reserved_byte_for_record_batch(batch);
+
+// If adding this batch would exceed the memory threshold, merge
current_batches.
+if current_size + batch_size > self.sort_in_place_threshold_bytes
+&& !current_batches.is_empty()
+{
+self.merge_and_push_sorted_batch(
+&mut current_batches,
+&mut current_size,
+&mut merged_batches,
+)?;
+current_size = 0;
+}
+
+current_batches.push(batch.clone());
+current_size += batch_size;
}
-// If less than sort_in_place_threshold_bytes, concatenate and sort in
place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
-self.in_mem_batches.clear();
-self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
-.map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+// Merge any remaining batches after the loop.
+if !current_batches.is_empty() {
+self.merge_and_push_sorted_batch(
+&mut current_batches,
+&mut current_size,
+&mut merged_batches,
+)?;
}
-let streams = std::mem::take(&mut self.in_mem_batches)
+let streams = merged_batches
Review Comment:
Good catch @Dandandan , for only one batch we don't need to send to
streaming merge.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
Dandandan commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2048432000
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -667,35 +669,57 @@ impl ExternalSorter {
// space for batches produced by the resulting stream will be reserved
by the
// consumer of the stream.
-if self.in_mem_batches.len() == 1 {
-let batch = self.in_mem_batches.swap_remove(0);
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+let mut merged_batches = Vec::new();
+let mut current_batches = Vec::new();
+let mut current_size = 0;
+
+
+let in_mem_batches = std::mem::take(&mut self.in_mem_batches);
+// Note:
+// Now we use `sort_in_place_threshold_bytes` to determine, in future
we can make it more dynamic.
+for batch in &in_mem_batches {
+let batch_size = get_reserved_byte_for_record_batch(batch);
+
+// If adding this batch would exceed the memory threshold, merge
current_batches.
+if current_size + batch_size > self.sort_in_place_threshold_bytes
+&& !current_batches.is_empty()
+{
+self.merge_and_push_sorted_batch(
+&mut current_batches,
+&mut current_size,
+&mut merged_batches,
+)?;
+current_size = 0;
+}
+
+current_batches.push(batch.clone());
+current_size += batch_size;
}
-// If less than sort_in_place_threshold_bytes, concatenate and sort in
place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
-self.in_mem_batches.clear();
-self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
-.map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+// Merge any remaining batches after the loop.
+if !current_batches.is_empty() {
+self.merge_and_push_sorted_batch(
+&mut current_batches,
+&mut current_size,
+&mut merged_batches,
+)?;
}
-let streams = std::mem::take(&mut self.in_mem_batches)
+let streams = merged_batches
Review Comment:
It seems now it still "merges" a batch even for a fully sorted single batch
case?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2812031488 Thank you @Dandandan @2010YOUY01 for review. I am trying to clean up code and do more testing, it seems somethings the result is unstable for testing about balance the sort in place size and the streaming merge count. The result will diff when running in different env and computer if we use a fixed sort-place-size. And i am thinking, if we can have a thread pod for sort and merge in future, and the thread number is partition number, and make sort and merge all async, and each sort/merge task can take from the thread pool and make sure it always running for the best performance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
2010YOUY01 commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2811725054 I really like this idea, I find this approach will not conflict with several future optimizations in my mind: - Use row format for sorting and reuse converted `Row`s for SPM, this only requires changing the current `vec` to a single `Row`, and won't introduce a structure change. - Eagerly `Row`s conversion for more robust memory accounting https://github.com/apache/datafusion/issues/14748 Additionally, `sort_in_place_threshold_bytes` is relatively easy to tune: it can be set as a typical cache size. If the number of sorted runs is still too large after combing, a cascaded merge can still be applied on top of that. I think it's a good idea to proceed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
Dandandan commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2047540213
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -673,29 +676,211 @@ impl ExternalSorter {
return self.sort_batch_stream(batch, metrics, reservation);
}
-// If less than sort_in_place_threshold_bytes, concatenate and sort in
place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
-self.in_mem_batches.clear();
-self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
-.map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+let mut merged_batches = Vec::new();
+let mut current_batches = Vec::new();
+let mut current_size = 0;
+
+// Drain in_mem_batches using pop() to release memory earlier.
+// This avoids holding onto the entire vector during iteration.
+// Note:
+// Now we use `sort_in_place_threshold_bytes` to determine, in future
we can make it more dynamic.
+while let Some(batch) = self.in_mem_batches.pop() {
+let batch_size = get_reserved_byte_for_record_batch(&batch);
+
+// If adding this batch would exceed the memory threshold, merge
current_batches.
+if current_size + batch_size > self.sort_in_place_threshold_bytes
+&& !current_batches.is_empty()
+{
+// = Phase 1: Build global sort columns for each sort
expression =
+// For each sort expression, evaluate and collect the
corresponding sort column from each in-memory batch
+// Here, `self.expr` is a list of sort expressions, each
providing `evaluate_to_sort_column()`,
+// which returns an ArrayRef (in `.values`) and sort options
(`options`)
+
+/// ```text
+/// columns_by_expr for example:
+/// ├── expr_0 ──┬── ArrayRef_0_0 (from batch_0)
+/// │├── ArrayRef_0_1 (from batch_1)
+/// │└── ArrayRef_0_2 (from batch_2)
+/// │
+/// └── expr_1 ──┬── ArrayRef_1_0 (from batch_0)
+/// ├── ArrayRef_1_1 (from batch_1)
+/// └── ArrayRef_1_2 (from batch_2)
+/// ```
+let mut columns_by_expr: Vec> = self
+.expr
+.iter()
+.map(|_| Vec::with_capacity(current_batches.len()))
+.collect();
+
+let mut total_rows = 0;
+for batch in ¤t_batches {
+for (i, expr) in self.expr.iter().enumerate() {
+let col = expr.evaluate_to_sort_column(batch)?.values;
+columns_by_expr[i].push(col);
+}
+total_rows += batch.num_rows();
+}
+
+// For each sort expression, concatenate arrays from all
batches into one global array
+let mut sort_columns = Vec::with_capacity(self.expr.len());
+for (arrays, expr) in
columns_by_expr.into_iter().zip(self.expr.iter()) {
+let array = concat(
+&arrays
+.iter()
+.map(|a| a.as_ref())
+.collect::>(),
+)?;
+sort_columns.push(SortColumn {
+values: array,
+options: expr.options.into(),
+});
+}
+
+// = Phase 2: Compute global sorted indices =
+// Use `lexsort_to_indices` to get global row indices in
sorted order (as if all batches were concatenated)
+
+let indices = lexsort_to_indices(&sort_columns, None)?;
+
+// = Phase 3: Reorder each column using the global sorted
indices =
+let batch_indices: Vec<(usize, usize)> = current_batches
+.iter()
+.enumerate()
+.map(|(batch_id, batch)| {
+(0..batch.num_rows()).map(move |i| (batch_id, i))
+})
+.flatten()
+.collect();
+
+// For each column:
+// 1. Concatenate all batch arrays for this column (in the
same order as assumed by `lexsort_to_indices`)
+// 2. Use Arrow's `take` function to reorder the column by
sorted indices
+let interleave_indices: Vec<(usize,
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
Dandandan commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2047534508
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -673,29 +676,211 @@ impl ExternalSorter {
return self.sort_batch_stream(batch, metrics, reservation);
}
-// If less than sort_in_place_threshold_bytes, concatenate and sort in
place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
-self.in_mem_batches.clear();
-self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
-.map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+let mut merged_batches = Vec::new();
+let mut current_batches = Vec::new();
+let mut current_size = 0;
+
+// Drain in_mem_batches using pop() to release memory earlier.
+// This avoids holding onto the entire vector during iteration.
+// Note:
+// Now we use `sort_in_place_threshold_bytes` to determine, in future
we can make it more dynamic.
+while let Some(batch) = self.in_mem_batches.pop() {
Review Comment:
Hmm. shouldn't it do it for `self.in_mem_batches` in one go in this case?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
Dandandan commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2047526844
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -673,29 +676,211 @@ impl ExternalSorter {
return self.sort_batch_stream(batch, metrics, reservation);
}
-// If less than sort_in_place_threshold_bytes, concatenate and sort in
place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
-self.in_mem_batches.clear();
-self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
-.map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+let mut merged_batches = Vec::new();
+let mut current_batches = Vec::new();
+let mut current_size = 0;
+
+// Drain in_mem_batches using pop() to release memory earlier.
+// This avoids holding onto the entire vector during iteration.
+// Note:
+// Now we use `sort_in_place_threshold_bytes` to determine, in future
we can make it more dynamic.
+while let Some(batch) = self.in_mem_batches.pop() {
+let batch_size = get_reserved_byte_for_record_batch(&batch);
+
+// If adding this batch would exceed the memory threshold, merge
current_batches.
+if current_size + batch_size > self.sort_in_place_threshold_bytes
+&& !current_batches.is_empty()
+{
+// = Phase 1: Build global sort columns for each sort
expression =
+// For each sort expression, evaluate and collect the
corresponding sort column from each in-memory batch
+// Here, `self.expr` is a list of sort expressions, each
providing `evaluate_to_sort_column()`,
+// which returns an ArrayRef (in `.values`) and sort options
(`options`)
+
+/// ```text
+/// columns_by_expr for example:
+/// ├── expr_0 ──┬── ArrayRef_0_0 (from batch_0)
+/// │├── ArrayRef_0_1 (from batch_1)
+/// │└── ArrayRef_0_2 (from batch_2)
+/// │
+/// └── expr_1 ──┬── ArrayRef_1_0 (from batch_0)
+/// ├── ArrayRef_1_1 (from batch_1)
+/// └── ArrayRef_1_2 (from batch_2)
+/// ```
+let mut columns_by_expr: Vec> = self
+.expr
+.iter()
+.map(|_| Vec::with_capacity(current_batches.len()))
+.collect();
+
+let mut total_rows = 0;
Review Comment:
`total_rows` doesn't seem used
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2809956328 Thank you @Dandandan , i submit the first version with both fast and no regression at the same time. Benchmark sort_tpch1.json ┏━━┳━━┳━┳━━━┓ ┃ Query┃ main ┃ concat_batches_for_sort ┃Change ┃ ┡━━╇━━╇━╇━━━┩ │ Q1 │ 153.49ms │146.72ms │ no change │ │ Q2 │ 131.29ms │126.38ms │ no change │ │ Q3 │ 980.57ms │988.13ms │ no change │ │ Q4 │ 252.25ms │236.61ms │ +1.07x faster │ │ Q5 │ 464.81ms │471.17ms │ no change │ │ Q6 │ 481.44ms │485.58ms │ no change │ │ Q7 │ 810.73ms │725.17ms │ +1.12x faster │ │ Q8 │ 498.10ms │522.58ms │ no change │ │ Q9 │ 503.80ms │527.44ms │ no change │ │ Q10 │ 789.02ms │737.35ms │ +1.07x faster │ │ Q11 │ 417.39ms │470.56ms │ 1.13x slower │ └──┴──┴─┴───┘ ┏┳━━━┓ ┃ Benchmark Summary ┃ ┃ ┡╇━━━┩ │ Total Time (main) │ 5482.89ms │ │ Total Time (concat_batches_for_sort) │ 5437.69ms │ │ Average Time (main)│ 498.44ms │ │ Average Time (concat_batches_for_sort) │ 494.34ms │ │ Queries Faster │ 3 │ │ Queries Slower │ 1 │ │ Queries with No Change │ 7 │ └┴───┘ Benchmark sort_tpch10.json ┏━━┳┳━┳━━━┓ ┃ Query┃ main ┃ concat_batches_for_sort ┃Change ┃ ┡━━╇╇━╇━━━┩ │ Q1 │ 2243.52ms │ 1551.61ms │ +1.45x faster │ │ Q2 │ 1842.11ms │ 1339.78ms │ +1.37x faster │ │ Q3 │ 12446.31ms │ 12173.95ms │ no change │ │ Q4 │ 4047.55ms │ 3159.04ms │ +1.28x faster │ │ Q5 │ 4364.46ms │ 4478.49ms │ no change │ │ Q6 │ 4561.01ms │ 4622.45ms │ no change │ │ Q7 │ 8158.01ms │ 7864.92ms │ no change │ │ Q8 │ 6077.40ms │ 5681.92ms │ +1.07x faster │ │ Q9 │ 6347.21ms │ 5817.61ms │ +1.09x faster │ │ Q10 │ 11561.03ms │ .58ms │ +1.30x faster │ │ Q11 │ 6069.42ms │ .38ms │ +1.09x faster │ └──┴┴─┴───┘ ┏┳┓ ┃ Benchmark Summary ┃┃ ┡╇┩ │ Total Time (main) │ 67718.04ms │ │ Total Time (concat_batches_for_sort) │ 61133.74ms │ │ Average Time (main)│ 6156.19ms │ │ Average Time (concat_batches_for_sort) │ 5557.61ms │ │ Queries Faster │ 7 │ │ Queries Slower │ 0 │ │ Queries with No Change │ 4 │ └┴┘ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
Dandandan commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2809508850
So to change it in your diff (didn't change the documentation).
I would like to keep the original `StreamingMergeBuilder` case and the `if
self.reservation.size() < self.sort_in_place_threshold_bytes` expression so
that we only have the "avoid concat for non-sort columns" optimization in place
and see if this improves on all sort queries.
```diff
-// If less than sort_in_place_threshold_bytes, concatenate and sort
in place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
-self.in_mem_batches.clear();
-self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))
-.map_err(Self::err_with_oom_context)?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
+let mut columns_by_expr: Vec> = vec![vec![];
self.expr.len()];
+for batch in &self.in_mem_batches {
+for (i, expr) in self.expr.iter().enumerate() {
+let col = expr.evaluate_to_sort_column(batch)?.values;
+columns_by_expr[i].push(col);
+}
}
-let streams = std::mem::take(&mut self.in_mem_batches)
-.into_iter()
-.map(|batch| {
-let metrics = self.metrics.baseline.intermediate();
-let reservation = self
-.reservation
-.split(get_reserved_byte_for_record_batch(&batch));
-let input = self.sort_batch_stream(batch, metrics,
reservation)?;
-Ok(spawn_buffered(input, 1))
-})
-.collect::>()?;
+// For each sort expression, concatenate arrays from all batches
into one global array
+let mut sort_columns = Vec::with_capacity(self.expr.len());
+for (arrays, expr) in
columns_by_expr.into_iter().zip(self.expr.iter()) {
+let array = concat(
+&arrays
+.iter()
+.map(|a| a.as_ref())
+.collect::>(),
+)?;
+sort_columns.push(SortColumn {
+values: array,
+options: expr.options.into(),
+});
+}
-let expressions: LexOrdering = self.expr.iter().cloned().collect();
+// = Phase 2: Compute global sorted indices =
+// Use `lexsort_to_indices` to get global row indices in sorted
order (as if all batches were concatenated)
+let indices = lexsort_to_indices(&sort_columns, None)?;
-StreamingMergeBuilder::new()
-.with_streams(streams)
-.with_schema(Arc::clone(&self.schema))
-.with_expressions(expressions.as_ref())
-.with_metrics(metrics)
-.with_batch_size(self.batch_size)
-.with_fetch(None)
-.with_reservation(self.merge_reservation.new_empty())
-.build()
+// = Phase 3: Reorder each column using the global sorted
indices =
+let num_columns = self.schema.fields().len();
+
+let batch_indices: Vec<(usize, usize)> = self
+.in_mem_batches
+.iter()
+.enumerate()
+.map(|(batch_id, batch)| (0..batch.num_rows()).map(move |i|
(batch_id, i)))
+.flatten()
+.collect();
+
+// For each column:
+// 1. Concatenate all batch arrays for this column (in the same
order as assumed by `lexsort_to_indices`)
+// 2. Use Arrow's `take` function to reorder the column by sorted
indices
+let interleave_indices: Vec<(usize, usize)> = indices
+.values()
+.iter()
+.map(|x| batch_indices[*x as usize])
+.collect();
+// Build a RecordBatch from the sorted columns
+
+let batches: Vec<&RecordBatch> =
self.in_mem_batches.iter().collect();
+
+let sorted_batch =
+interleave_record_batch(batches.as_ref(), &interleave_indices)?;
+// Clear in-memory batches and update reservation
+self.in_mem_batches.clear();
+self.reservation
+.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))?;
+let reservation = self.reservation.take();
+
+// = Phase 4: Construct the resulting stream =
+let stream = futures::stream::once(async move {
+let _timer = met
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
Dandandan commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2809275560 Interesting! I think it's close but still concatenate the input arrays in Phase 3. I think it's pretty close, what you can do to avoid it is: * create a `Vec<(size, size)>` based on the input batches, each element being (batch_id, row_id)` (0, 1), (0,2), 0,3 * use the global indices to find the (batch_id, row_id) for each sorted index into a new `Vec` * use `interleave` on all arrays rather than `take` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2809034811 ```rust /bench.sh compare main concat_batches_for_sort Comparing main and concat_batches_for_sort Note: Skipping /Users/zhuqi/arrow-datafusion/benchmarks/results/main/clickbench_1.json as /Users/zhuqi/arrow-datafusion/benchmarks/results/concat_batches_for_sort/clickbench_1.json does not exist Note: Skipping /Users/zhuqi/arrow-datafusion/benchmarks/results/main/h2o_join.json as /Users/zhuqi/arrow-datafusion/benchmarks/results/concat_batches_for_sort/h2o_join.json does not exist Note: Skipping /Users/zhuqi/arrow-datafusion/benchmarks/results/main/sort_tpch.json as /Users/zhuqi/arrow-datafusion/benchmarks/results/concat_batches_for_sort/sort_tpch.json does not exist Benchmark sort_tpch1.json ┏━━┳━━┳━┳━━━┓ ┃ Query┃ main ┃ concat_batches_for_sort ┃Change ┃ ┡━━╇━━╇━╇━━━┩ │ Q1 │ 153.49ms │142.66ms │ +1.08x faster │ │ Q2 │ 131.29ms │110.46ms │ +1.19x faster │ │ Q3 │ 980.57ms │959.09ms │ no change │ │ Q4 │ 252.25ms │207.46ms │ +1.22x faster │ │ Q5 │ 464.81ms │561.74ms │ 1.21x slower │ │ Q6 │ 481.44ms │582.54ms │ 1.21x slower │ │ Q7 │ 810.73ms │910.22ms │ 1.12x slower │ │ Q8 │ 498.10ms │473.39ms │ no change │ │ Q9 │ 503.80ms │492.94ms │ no change │ │ Q10 │ 789.02ms │833.81ms │ 1.06x slower │ │ Q11 │ 417.39ms │401.60ms │ no change │ └──┴──┴─┴───┘ ┏┳━━━┓ ┃ Benchmark Summary ┃ ┃ ┡╇━━━┩ │ Total Time (main) │ 5482.89ms │ │ Total Time (concat_batches_for_sort) │ 5675.92ms │ │ Average Time (main)│ 498.44ms │ │ Average Time (concat_batches_for_sort) │ 515.99ms │ │ Queries Faster │ 3 │ │ Queries Slower │ 4 │ │ Queries with No Change │ 4 │ └┴───┘ Benchmark sort_tpch10.json ┏━━┳┳━┳━━━┓ ┃ Query┃ main ┃ concat_batches_for_sort ┃Change ┃ ┡━━╇╇━╇━━━┩ │ Q1 │ 2243.52ms │ 1412.73ms │ +1.59x faster │ │ Q2 │ 1842.11ms │ 1113.81ms │ +1.65x faster │ │ Q3 │ 12446.31ms │ 12687.14ms │ no change │ │ Q4 │ 4047.55ms │ 1891.10ms │ +2.14x faster │ │ Q5 │ 4364.46ms │ 6024.94ms │ 1.38x slower │ │ Q6 │ 4561.01ms │ 6281.17ms │ 1.38x slower │ │ Q7 │ 8158.01ms │ 13184.06ms │ 1.62x slower │ │ Q8 │ 6077.40ms │ 5277.44ms │ +1.15x faster │ │ Q9 │ 6347.21ms │ 5308.08ms │ +1.20x faster │ │ Q10 │ 11561.03ms │ 22213.97ms │ 1.92x slower │ │ Q11 │ 6069.42ms │ 4524.58ms │ +1.34x faster │ └──┴┴─┴───┘ ┏┳┓ ┃ Benchmark Summary ┃┃ ┡╇┩ │ Total Time (main) │ 67718.04ms │ │ Total Time (concat_batches_for_sort) │ 79918.99ms │ │ Average Time (main)│ 6156.19ms │ │ Average Time (concat_batches_for_sort) │ 7265.36ms │ │ Queries Faster │ 6 │ │ Queries Slower │ 4 │ │ Queries with No Change │ 1 │ └┴┘ ``` > I wonder if we can make a more "simple" change for now: > > * `concat` regresses because it copies the _all columns_ of the recordbatch before sorting. > * We can concat the sorting columns instead + generate a Vec of `(batch_id, row_id)` and map output of `lexsort_to_indices` back to the original values. > > Doing this I think will benefit existing sorting without (too much) regressions. we probably try increase the in memory threshold value (`sort_in_place_threshold_by
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
Dandandan commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2808612984 I wonder if we can make a more "simple" change for now: * `concat` regresses because it copies the _all columns_ of the recordbatch before sorting. * We can concat the sorting columns instead + generate a Vec of `(batch_id, row_id)` and map output of `lexsort_to_indices` back to the original values. Doing this I think will benefit existing sorting without (too much) regressions. we probably try increase the in memory threshold value (`sort_in_place_threshold_bytes`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2808117814 > 🤖: Benchmark completed > > Details > > ``` > Comparing HEAD and concat_batches_for_sort > > Benchmark clickbench_1.json > > ┏━━┳┳━┳━━━┓ > ┃ Query┃ HEAD ┃ concat_batches_for_sort ┃Change ┃ > ┡━━╇╇━╇━━━┩ > │ QQuery 0 │ 0.56ms │ 0.56ms │ no change │ > │ QQuery 1 │80.70ms │ 78.53ms │ no change │ > │ QQuery 2 │ 116.89ms │114.69ms │ no change │ > │ QQuery 3 │ 130.38ms │125.49ms │ no change │ > │ QQuery 4 │ 861.43ms │756.05ms │ +1.14x faster │ > │ QQuery 5 │ 878.02ms │869.13ms │ no change │ > │ QQuery 6 │ 0.67ms │ 0.64ms │ no change │ > │ QQuery 7 │ 100.33ms │ 93.51ms │ +1.07x faster │ > │ QQuery 8 │ 979.95ms │956.50ms │ no change │ > │ QQuery 9 │ 1283.74ms │ 1245.12ms │ no change │ > │ QQuery 10│ 304.60ms │306.39ms │ no change │ > │ QQuery 11│ 342.54ms │340.89ms │ no change │ > │ QQuery 12│ 930.04ms │933.37ms │ no change │ > │ QQuery 13│ 1337.30ms │ 1341.28ms │ no change │ > │ QQuery 14│ 869.61ms │883.04ms │ no change │ > │ QQuery 15│ 1088.81ms │ 1083.02ms │ no change │ > │ QQuery 16│ 1841.74ms │ 1788.14ms │ no change │ > │ QQuery 17│ 1680.12ms │ 1638.39ms │ no change │ > │ QQuery 18│ 3128.65ms │ 3139.26ms │ no change │ > │ QQuery 19│ 127.46ms │120.42ms │ +1.06x faster │ > │ QQuery 20│ 1169.35ms │ 1195.58ms │ no change │ > │ QQuery 21│ 1472.42ms │ 1457.18ms │ no change │ > │ QQuery 22│ 2595.51ms │ 2696.08ms │ no change │ > │ QQuery 23│ 8475.08ms │ 8735.96ms │ no change │ > │ QQuery 24│ 510.59ms │515.80ms │ no change │ > │ QQuery 25│ 441.39ms │439.44ms │ no change │ > │ QQuery 26│ 569.36ms │581.32ms │ no change │ > │ QQuery 27│ 1850.31ms │ 1844.63ms │ no change │ > │ QQuery 28│ 13503.59ms │ 13185.12ms │ no change │ > │ QQuery 29│ 587.04ms │548.23ms │ +1.07x faster │ > │ QQuery 30│ 872.06ms │861.85ms │ no change │ > │ QQuery 31│ 924.05ms │992.86ms │ 1.07x slower │ > │ QQuery 32│ 2763.71ms │ 2715.48ms │ no change │ > │ QQuery 33│ 3455.95ms │ 3450.90ms │ no change │ > │ QQuery 34│ 3466.53ms │ 3478.02ms │ no change │ > │ QQuery 35│ 1342.93ms │ 1336.22ms │ no change │ > │ QQuery 36│ 179.89ms │185.83ms │ no change │ > │ QQuery 37│ 106.98ms │102.42ms │ no change │ > │ QQuery 38│ 169.57ms │181.57ms │ 1.07x slower │ > │ QQuery 39│ 263.27ms │261.33ms │ no change │ > │ QQuery 40│90.39ms │ 86.94ms │ no change │ > │ QQuery 41│85.04ms │ 83.29ms │ no change │ > │ QQuery 42│78.33ms │ 78.86ms │ no change │ > └──┴┴─┴───┘ > ┏┳┓ > ┃ Benchmark Summary ┃┃ > ┡╇┩ > │ Total Time (HEAD) │ 61056.89ms │ > │ Total Time (concat_batches_for_sort) │ 60829.33ms │ > │ Average Time (HEAD)│ 1419.93ms │ > │ Average Time (concat_batches_for_sort) │ 1414.64ms │ > │ Queries Faster │ 4 │ > │ Queries Slower │ 2 │ > │ Queries with No Change │ 37 │ > └┴┘ > > Benchmark clickbench_partitioned.json > > ┏━━┳┳━┳━━┓ > ┃ Query┃ HEAD ┃ concat_batches_for_sort ┃ Change ┃ > ┡━━╇╇━╇━━┩ > │ QQuery 0 │ 2.39ms │ 2.57ms │ 1.07x slower │
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2808114536 > > I think the most efficient way would be to sort the indices to the arrays in one step followed by interleave, without either concat or sort followed by merge which would benefit the most from the built in sort algorithm and avoids copying the data. > > I wonder if we can skip interleave / copying entirely? > > Specifically, what if we sorted to indices, as you suggested, but then instead of calling `interleave` (which will copy the data) before sending it to merge_streams) maybe we could have some way to have the merge cursors also take the indicies -- so we could only copy data once 🤔 Thanks @alamb , it looks promising. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
alamb commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2806923604 🤖: Benchmark completed Details ``` Comparing HEAD and concat_batches_for_sort Benchmark clickbench_1.json ┏━━┳┳━┳━━━┓ ┃ Query┃ HEAD ┃ concat_batches_for_sort ┃Change ┃ ┡━━╇╇━╇━━━┩ │ QQuery 0 │ 0.56ms │ 0.56ms │ no change │ │ QQuery 1 │80.70ms │ 78.53ms │ no change │ │ QQuery 2 │ 116.89ms │114.69ms │ no change │ │ QQuery 3 │ 130.38ms │125.49ms │ no change │ │ QQuery 4 │ 861.43ms │756.05ms │ +1.14x faster │ │ QQuery 5 │ 878.02ms │869.13ms │ no change │ │ QQuery 6 │ 0.67ms │ 0.64ms │ no change │ │ QQuery 7 │ 100.33ms │ 93.51ms │ +1.07x faster │ │ QQuery 8 │ 979.95ms │956.50ms │ no change │ │ QQuery 9 │ 1283.74ms │ 1245.12ms │ no change │ │ QQuery 10│ 304.60ms │306.39ms │ no change │ │ QQuery 11│ 342.54ms │340.89ms │ no change │ │ QQuery 12│ 930.04ms │933.37ms │ no change │ │ QQuery 13│ 1337.30ms │ 1341.28ms │ no change │ │ QQuery 14│ 869.61ms │883.04ms │ no change │ │ QQuery 15│ 1088.81ms │ 1083.02ms │ no change │ │ QQuery 16│ 1841.74ms │ 1788.14ms │ no change │ │ QQuery 17│ 1680.12ms │ 1638.39ms │ no change │ │ QQuery 18│ 3128.65ms │ 3139.26ms │ no change │ │ QQuery 19│ 127.46ms │120.42ms │ +1.06x faster │ │ QQuery 20│ 1169.35ms │ 1195.58ms │ no change │ │ QQuery 21│ 1472.42ms │ 1457.18ms │ no change │ │ QQuery 22│ 2595.51ms │ 2696.08ms │ no change │ │ QQuery 23│ 8475.08ms │ 8735.96ms │ no change │ │ QQuery 24│ 510.59ms │515.80ms │ no change │ │ QQuery 25│ 441.39ms │439.44ms │ no change │ │ QQuery 26│ 569.36ms │581.32ms │ no change │ │ QQuery 27│ 1850.31ms │ 1844.63ms │ no change │ │ QQuery 28│ 13503.59ms │ 13185.12ms │ no change │ │ QQuery 29│ 587.04ms │548.23ms │ +1.07x faster │ │ QQuery 30│ 872.06ms │861.85ms │ no change │ │ QQuery 31│ 924.05ms │992.86ms │ 1.07x slower │ │ QQuery 32│ 2763.71ms │ 2715.48ms │ no change │ │ QQuery 33│ 3455.95ms │ 3450.90ms │ no change │ │ QQuery 34│ 3466.53ms │ 3478.02ms │ no change │ │ QQuery 35│ 1342.93ms │ 1336.22ms │ no change │ │ QQuery 36│ 179.89ms │185.83ms │ no change │ │ QQuery 37│ 106.98ms │102.42ms │ no change │ │ QQuery 38│ 169.57ms │181.57ms │ 1.07x slower │ │ QQuery 39│ 263.27ms │261.33ms │ no change │ │ QQuery 40│90.39ms │ 86.94ms │ no change │ │ QQuery 41│85.04ms │ 83.29ms │ no change │ │ QQuery 42│78.33ms │ 78.86ms │ no change │ └──┴┴─┴───┘ ┏┳┓ ┃ Benchmark Summary ┃┃ ┡╇┩ │ Total Time (HEAD) │ 61056.89ms │ │ Total Time (concat_batches_for_sort) │ 60829.33ms │ │ Average Time (HEAD)│ 1419.93ms │ │ Average Time (concat_batches_for_sort) │ 1414.64ms │ │ Queries Faster │ 4 │ │ Queries Slower │ 2 │ │ Queries with No Change │ 37 │ └┴┘ Benchmark clickbench_partitioned.json ┏━━┳┳━┳━━┓ ┃ Query┃ HEAD ┃ concat_batches_for_sort ┃ Change ┃ ┡━━╇╇━╇━━┩ │ QQuery 0 │ 2.39ms │ 2.57ms │ 1.07x slower │ │ QQuery 1 │36.11ms │ 36.43ms │no change │ │ QQuery 2 │91.64ms │ 90.30ms │no change
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
alamb commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2806867253 > I think the most efficient way would be to sort the indices to the arrays in one step followed by interleave, without either concat or sort followed by merge which would benefit the most from the built in sort algorithm and avoids copying the data. I wonder if we can skip interleave / copying entirely? Specifically, what if we sorted to indices, as you suggested, but then instead of calling `interleave` (which will copy the data) before sending it to merge_streams) maybe we could have some way to have the merge cursors also take the indicies -- so we could only copy data once 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
alamb commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2806860357 > > 🤖 `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking) Running Linux aal-dev 6.8.0-1016-gcp #18-Ubuntu SMP Fri Oct 4 22:16:29 UTC 2024 x86_64 x86_64 x86_64 GNU/Linux Comparing concat_batches_for_sort ([6063bc5](https://github.com/apache/datafusion/commit/6063bc572ca61fd71fd57382f94389ec99d9649f)) to [0b01fdf](https://github.com/apache/datafusion/commit/0b01fdf7f02f9097c319204058576f420b9790b4) [diff](https://github.com/apache/datafusion/compare/0b01fdf7f02f9097c319204058576f420b9790b4..6063bc572ca61fd71fd57382f94389ec99d9649f) Benchmarks: clickbench_1 clickbench_partitioned sort_tpch1 Results will be posted here when complete > > Thanks @alamb for this triggering, it seems stuck. yeah, sorry I had a bug retriggered -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
alamb commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2806856758 🤖 `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking) Running Linux aal-dev 6.8.0-1016-gcp #18-Ubuntu SMP Fri Oct 4 22:16:29 UTC 2024 x86_64 x86_64 x86_64 GNU/Linux Comparing concat_batches_for_sort (6063bc572ca61fd71fd57382f94389ec99d9649f) to 0b01fdf7f02f9097c319204058576f420b9790b4 [diff](https://github.com/apache/datafusion/compare/0b01fdf7f02f9097c319204058576f420b9790b4..6063bc572ca61fd71fd57382f94389ec99d9649f) Benchmarks: clickbench_1 clickbench_partitioned sort_tpch Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2803885281 > 🤖 `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking) Running Linux aal-dev 6.8.0-1016-gcp #18-Ubuntu SMP Fri Oct 4 22:16:29 UTC 2024 x86_64 x86_64 x86_64 GNU/Linux Comparing concat_batches_for_sort ([6063bc5](https://github.com/apache/datafusion/commit/6063bc572ca61fd71fd57382f94389ec99d9649f)) to [0b01fdf](https://github.com/apache/datafusion/commit/0b01fdf7f02f9097c319204058576f420b9790b4) [diff](https://github.com/apache/datafusion/compare/0b01fdf7f02f9097c319204058576f420b9790b4..6063bc572ca61fd71fd57382f94389ec99d9649f) Benchmarks: clickbench_1 clickbench_partitioned sort_tpch1 Results will be posted here when complete Thanks @alamb for this triggering, it seems stuck. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2804582604 Thank you @2010YOUY01 @Dandandan , it's very interesting, i am thinking: 1. Since the all batch size sum is fixed, we can first calculate the compute size of each partition, call it partition_cal_size. 2. Then we setting a min_sort_size and max_sort_size, so we will determine the final_merged_batch_size: ```rust final_merged_batch_size = if (partition_cal_size < min_sort_size) => min_sort_size else if (partition_cal_size > max_sort_size) => max_sort_size else => partition_cal_size ``` This prevents creating too many small batches (which can fragment merge tasks) or overly large batches. It looks like the first version of heuristic > > > I think for `ExternalSorter` we don't want any additional parallelism as the sort is already executed per partition (so additional parallelism is likely to hurt rather than help). > > > > > > In this case, the final merging might become the bottleneck, because SPM does not have internal parallelism either, during the final merge only 1 core is busy. I think 2 stages of sort-preserving merge is still needed, becuase `ExternalSorter` is blocking, but `SPM` is not, this setup can keep all the cores busy after partial sort is finished. We just have to ensure they don't have a very large merge degree to become slow (with the optimizations like this PR) > > Yes, to be clear I don't argue to remove SortPreservingMergeExec or sorting in two fases altogether or something similar, just was reacting to the idea of adding more parallelism in `in_mem_sort_stream` which probably won't help much. > > ``` > SortPreserveMergeExec <= Does k-way merging based on input streams, with minimal memory overhead, maximizing input parallelism > SortExec partitions[1,2,3,4,5,6,7,8,9,10] <= Performs in memory *sorting* if possible, for each input partition in parallel, only resorting to spill/merge when does not fit into memory > ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2804362823 > I think `concat` followed by `sort` is slower in some cases because > > * Concat involves copying the entire batch (rather than only the keys to be sorted) > * `sort_batch_stream` Can be slower as `lexsort_to_indices` is in cases with many columns slower than the Row Format > > I think for `ExternalSorter` we don't want any additional parallelism as the sort is already executed per partition (so additional parallelism is likely to hurt rather than help). > > The core improvements that I think are important: > > * Minimizing copying of the input batches to one (only once for the output) > * Sorting once on the input batches rather than sort followed by merge > * A good heuristic on when to switch from `lexsort_to_indices`-like sorting to RowConverter + sorting. Good explain. > I think for ExternalSorter we don't want any additional parallelism as the sort is already executed per partition (so additional parallelism is likely to hurt rather than help). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
Dandandan commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2804526858 > > I think for `ExternalSorter` we don't want any additional parallelism as the sort is already executed per partition (so additional parallelism is likely to hurt rather than help). > > In this case, the final merging might become the bottleneck, because SPM does not have internal parallelism either, during the final merge only 1 core is busy. I think 2 stages of sort-preserving merge is still needed, becuase `ExternalSorter` is blocking, but `SPM` is not, this setup can keep all the cores busy after partial sort is finished. We just have to ensure they don't have a very large merge degree to become slow (with the optimizations like this PR) Yes, to be clear I don't argue to remove SortPreservingMergeExec or sorting in two fases altogether or something similar, just was reacting to the idea of adding more parallelism in `in_mem_sort_stream` which probably won't help much. ``` SortPreserveMergeExec <= Does k-way merging based on input streams, with minimal memory overhead, maximizing input parallelism SortExec partitions[1,2,3,4,5,6,7,8,9,10] <= Performs in memory *sorting* if possible, for each input partition in parallel, only resorting to spill/merge when does not fit into memory ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
2010YOUY01 commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2804473128 > I think for `ExternalSorter` we don't want any additional parallelism as the sort is already executed per partition (so additional parallelism is likely to hurt rather than help). In this case, the final merging might become the bottleneck, because SPM does not have internal parallelism either, during the final merge only 1 core is busy. I think 2 stages of sort-preserving merge is still needed, becuase `ExternalSorter` is blocking, but `SPM` is not, this setup can keep all the cores busy after partial sort is finished. We just have to ensure they don't have a very large merge degree to become slow (with the optimizations like this PR) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
Dandandan commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2804304348 I think `concat` followed by `sort` is slower because * Concat involves copying the entire batch (rather than only the keys to be sorted) * `sort_batch_stream` Can be slower as `lexsort_to_indices` is in cases with many columns slower than the Row Format I think for `ExternalSorter` we don't want any additional parallelism as the sort is already executed per partition. The core improvement: * minimizing copying of the input batches to one (only for the output) * sorting on the input batches rather than sort followed by merge * A good heuristic on when to switch from `lexsort_to_indices`-like sorting to RowConverter + sorting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2804216535
Very interesting, firstly i now try merge all memory batch, and single sort,
some query become crazy fast and some crazy slow, i think because:
1. We sort in memory without merge, it's similar to sort single partition
without partition parallel ?
2. Previous some merge will have partition parallel?
So next step, we can try to make the in memory sort with parallel?
```rust
Benchmark sort_tpch10.json
┏━━┳┳━┳━━━┓
┃ Query┃ main ┃ concat_batches_for_sort ┃Change ┃
┡━━╇╇━╇━━━┩
│ Q1 │ 2243.52ms │ 1416.52ms │ +1.58x faster │
│ Q2 │ 1842.11ms │ 1096.12ms │ +1.68x faster │
│ Q3 │ 12446.31ms │ 12535.45ms │ no change │
│ Q4 │ 4047.55ms │ 1964.73ms │ +2.06x faster │
│ Q5 │ 4364.46ms │ 5955.70ms │ 1.36x slower │
│ Q6 │ 4561.01ms │ 6275.39ms │ 1.38x slower │
│ Q7 │ 8158.01ms │ 19145.68ms │ 2.35x slower │
│ Q8 │ 6077.40ms │ 5146.80ms │ +1.18x faster │
│ Q9 │ 6347.21ms │ 5544.48ms │ +1.14x faster │
│ Q10 │ 11561.03ms │ 23572.68ms │ 2.04x slower │
│ Q11 │ 6069.42ms │ 4810.88ms │ +1.26x faster │
└──┴┴─┴───┘
┏┳┓
┃ Benchmark Summary ┃┃
┡╇┩
│ Total Time (main) │ 67718.04ms │
│ Total Time (concat_batches_for_sort) │ 87464.44ms │
│ Average Time (main)│ 6156.19ms │
│ Average Time (concat_batches_for_sort) │ 7951.31ms │
│ Queries Faster │ 6 │
│ Queries Slower │ 4 │
│ Queries with No Change │ 1 │
└┴┘
```
Patch tried:
```rust
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index 7fd1c2b16..ec3cd89f3 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -671,85 +671,14 @@ impl ExternalSorter {
return self.sort_batch_stream(batch, metrics, reservation);
}
-// If less than sort_in_place_threshold_bytes, concatenate and sort
in place
-if self.reservation.size() < self.sort_in_place_threshold_bytes {
-// Concatenate memory batches together and sort
-let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
-self.in_mem_batches.clear();
-self.reservation
-.try_resize(get_reserved_byte_for_record_batch(&batch))?;
-let reservation = self.reservation.take();
-return self.sort_batch_stream(batch, metrics, reservation);
-}
-
-let mut merged_batches = Vec::new();
-let mut current_batches = Vec::new();
-let mut current_size = 0;
-
-// Drain in_mem_batches using pop() to release memory earlier.
-// This avoids holding onto the entire vector during iteration.
-// Note:
-// Now we use `sort_in_place_threshold_bytes` to determine, in
future we can make it more dynamic.
-while let Some(batch) = self.in_mem_batches.pop() {
-let batch_size = get_reserved_byte_for_record_batch(&batch);
-
-// If adding this batch would exceed the memory threshold,
merge current_batches.
-if current_size + batch_size >
self.sort_in_place_threshold_bytes
-&& !current_batches.is_empty()
-{
-// Merge accumulated batches into one.
-let merged = concat_batches(&self.schema,
¤t_batches)?;
-current_batches.clear();
-
-// Update memory reservation.
-self.reservation.try_shrink(current_size)?;
-let merged_size =
get_reserved_byte_for_record_batch(&merged);
-self.reservation.try_grow(merged_size)?;
-
-merged_batches.push(merged);
-current_size = 0;
-}
-
-current_batches.push(batch);
-current_size += batch_size;
-}
-
-// Merge any remaining batches after the loop.
-if !current_b
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2804085563 > #15375 (comment) I think i got it @Dandandan now, it means we already have those in memory batch, we just need to first sort all elements' indices (2-level index consists of (batch_idx, row_idx)), we don't need to construct the StreamingMergeBuilder for in memory sort, we just need to sort it as a single sorting pass. Let me try this way, and compare the performance! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
Dandandan commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2804057064 A more complete rationale / explanation of the same idea was written here by @2010YOUY01 https://github.com/apache/datafusion/issues/15375#issuecomment-2747654525 > An alternative to try to avoid copies is: first sort all elements' indices (2-level index consists of (batch_idx, row_idx)), and get a permutation array. Use the interleave kernel to construct the final result https://docs.rs/arrow/latest/arrow/compute/kernels/interleave/fn.interleave.html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
Dandandan commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2804048977 > But this PR, we also concat some batches into one batch, do you mean we can also use the indices from each batch to one batch just like the merge phase? I mean theoretically we don't have to `merge` as all the batches are in memory. The merging is useful for sorting streams of data, but I think it is expected the process of sorting batches first followed by a custom merge implementation is slower than one sorting pass based on rust std unstable sort (which is optimized for doing a minimal amount of comparisons quickly). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2803881895
It seems when we merge the sorted batch, we already using the interleave to
merge the sorted indices, here is the code:
```rust
/// Drains the in_progress row indexes, and builds a new RecordBatch
from them
///
/// Will then drop any batches for which all rows have been yielded to
the output
///
/// Returns `None` if no pending rows
pub fn build_record_batch(&mut self) -> Result> {
if self.is_empty() {
return Ok(None);
}
let columns = (0..self.schema.fields.len())
.map(|column_idx| {
let arrays: Vec<_> = self
.batches
.iter()
.map(|(_, batch)| batch.column(column_idx).as_ref())
.collect();
Ok(interleave(&arrays, &self.indices)?)
})
.collect::>>()?;
self.indices.clear();
```
But this PR, we also concat some batches into one batch, do you mean we can
also use the indices from each batch to one batch just like the merge phase?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
Dandandan commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2803478057 > > Thanks for sharing the results @zhuqi-lucas this is really interesting! > > I think it mainly shows that we probably should try and use more efficient in memory sorting (e.g. an arrow kernel that sorts multiple batches) here rather than use `SortPreservingMergeStream` which is intended to be used on data streams. The arrow kernel would avoid the regressions of `concat`. > > I think the SortPreservingMergeStream is about as efficient as we know how to make it > > Maybe we can look into what overhead makes concat'ing better 🤔 Any per-stream overhead we can improve in SortPreservingMergeStream would likely flow directly to any query that does sorts Perhaps SortPreservingMergeStream can be optimized for the cases all partitions are exhausted / all batches of all partitions are in memory, so it doesn't need to deal with the `cursor` complexity and can use Rust quicksort on the remaining data... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
Dandandan commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2803501353 Hm that doesn't make much sense as > > Thanks for sharing the results @zhuqi-lucas this is really interesting! > > I think it mainly shows that we probably should try and use more efficient in memory sorting (e.g. an arrow kernel that sorts multiple batches) here rather than use `SortPreservingMergeStream` which is intended to be used on data streams. The arrow kernel would avoid the regressions of `concat`. > > I think the SortPreservingMergeStream is about as efficient as we know how to make it > > Maybe we can look into what overhead makes concat'ing better 🤔 Any per-stream overhead we can improve in SortPreservingMergeStream would likely flow directly to any query that does sorts Hm 🤔 ... but that will still take a separate step of sorting the input bathes, which next to sorting involves a full extra copy using `take` (slower than `concat`) followed by merging the batches. Also the built-in sort on the entire output is likely to be much faster than doing a merge on the outputs. I think the most efficient way would be to sort the indices to the arrays in one step followed by `interleave`, without either `concat` or `sort` followed by `merge` which would benefit the most from the built in sort algorithm and avoids copying the data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
alamb commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2802975607 🤖 `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking) Running Linux aal-dev 6.8.0-1016-gcp #18-Ubuntu SMP Fri Oct 4 22:16:29 UTC 2024 x86_64 x86_64 x86_64 GNU/Linux Comparing concat_batches_for_sort (6063bc572ca61fd71fd57382f94389ec99d9649f) to 0b01fdf7f02f9097c319204058576f420b9790b4 [diff](https://github.com/apache/datafusion/compare/0b01fdf7f02f9097c319204058576f420b9790b4..6063bc572ca61fd71fd57382f94389ec99d9649f) Benchmarks: clickbench_1 clickbench_partitioned sort_tpch1 Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
alamb commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2802910163 > Thanks for sharing the results @zhuqi-lucas this is really interesting! > > I think it mainly shows that we probably should try and use more efficient in memory sorting (e.g. an arrow kernel that sorts multiple batches) here rather than use `SortPreservingMergeStream` which is intended to be used on data streams. The arrow kernel would avoid the regressions of `concat`. I think the SortPreservingMergeStream is about as efficient as we know how to make it Maybe we can look into what overhead makes concat'ing better 🤔 Any per-stream overhead we can improve in SortPreservingMergeStream would likely flow directly to any query that does sorts -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
alamb commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2802885322 🤖 `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking) Running Linux aal-dev 6.8.0-1016-gcp #18-Ubuntu SMP Fri Oct 4 22:16:29 UTC 2024 x86_64 x86_64 x86_64 GNU/Linux Comparing concat_batches_for_sort (6063bc572ca61fd71fd57382f94389ec99d9649f) to 0b01fdf7f02f9097c319204058576f420b9790b4 [diff](https://github.com/apache/datafusion/compare/0b01fdf7f02f9097c319204058576f420b9790b4..6063bc572ca61fd71fd57382f94389ec99d9649f) Benchmarks: clickbench_1 clickbench_partitioned sort_tpch1 Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
Dandandan commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2798955035 Thanks for sharing the results @zhuqi-lucas this is really interesting! I think it mainly shows that we probably should try and use more efficient in memory sorting (e.g. an arrow kernel that sorts multiple batches) here rather than use `SortPreservingMergeStream` which is intended to be used on data streams. The arrow kernel would avoid the regressions of `concat`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2798879716 Latest result based current latest code: ```rust Benchmark sort_tpch1.json ┏━━┳━━┳━┳━━━┓ ┃ Query┃ main ┃ concat_batches_for_sort ┃Change ┃ ┡━━╇━━╇━╇━━━┩ │ Q1 │ 153.49ms │137.57ms │ +1.12x faster │ │ Q2 │ 131.29ms │120.93ms │ +1.09x faster │ │ Q3 │ 980.57ms │982.22ms │ no change │ │ Q4 │ 252.25ms │245.09ms │ no change │ │ Q5 │ 464.81ms │449.27ms │ no change │ │ Q6 │ 481.44ms │455.45ms │ +1.06x faster │ │ Q7 │ 810.73ms │709.74ms │ +1.14x faster │ │ Q8 │ 498.10ms │491.12ms │ no change │ │ Q9 │ 503.80ms │510.20ms │ no change │ │ Q10 │ 789.02ms │706.45ms │ +1.12x faster │ │ Q11 │ 417.39ms │411.50ms │ no change │ └──┴──┴─┴───┘ ┏┳━━━┓ ┃ Benchmark Summary ┃ ┃ ┡╇━━━┩ │ Total Time (main) │ 5482.89ms │ │ Total Time (concat_batches_for_sort) │ 5219.53ms │ │ Average Time (main)│ 498.44ms │ │ Average Time (concat_batches_for_sort) │ 474.50ms │ │ Queries Faster │ 5 │ │ Queries Slower │ 0 │ │ Queries with No Change │ 6 │ └┴───┘ Benchmark sort_tpch10.json ┏━━┳┳━┳━━━┓ ┃ Query┃ main ┃ concat_batches_for_sort ┃Change ┃ ┡━━╇╇━╇━━━┩ │ Q1 │ 2243.52ms │ 1825.64ms │ +1.23x faster │ │ Q2 │ 1842.11ms │ 1639.00ms │ +1.12x faster │ │ Q3 │ 12446.31ms │ 11981.63ms │ no change │ │ Q4 │ 4047.55ms │ 3715.96ms │ +1.09x faster │ │ Q5 │ 4364.46ms │ 4503.51ms │ no change │ │ Q6 │ 4561.01ms │ 4688.31ms │ no change │ │ Q7 │ 8158.01ms │ 7915.54ms │ no change │ │ Q8 │ 6077.40ms │ 5524.08ms │ +1.10x faster │ │ Q9 │ 6347.21ms │ 5853.44ms │ +1.08x faster │ │ Q10 │ 11561.03ms │ 14235.69ms │ 1.23x slower │ │ Q11 │ 6069.42ms │ 5666.77ms │ +1.07x faster │ └──┴┴─┴───┘ ┏┳┓ ┃ Benchmark Summary ┃┃ ┡╇┩ │ Total Time (main) │ 67718.04ms │ │ Total Time (concat_batches_for_sort) │ 67549.58ms │ │ Average Time (main)│ 6156.19ms │ │ Average Time (concat_batches_for_sort) │ 6140.87ms │ │ Queries Faster │ 6 │ │ Queries Slower │ 1 │ │ Queries with No Change │ 4 │ └┴┘ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on PR #15380: URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2798868399 @alamb Do we have the CI benchmark running now? If no, i need your help to run... Thanks a lot! And also for the sort-tpch itself, i was running for the improvement result, but not for other benchmark running. Previous sort-tpch: ```rust ┏━━┳┳━┳━━━┓ ┃ Query┃ main ┃ concat_batches_for_sort ┃Change ┃ ┡━━╇╇━╇━━━┩ │ Q1 │ 2241.04ms │ 1816.69ms │ +1.23x faster │ │ Q2 │ 1841.01ms │ 1496.73ms │ +1.23x faster │ │ Q3 │ 12755.85ms │ 12770.18ms │ no change │ │ Q4 │ 4433.49ms │ 3278.70ms │ +1.35x faster │ │ Q5 │ 4414.15ms │ 4409.04ms │ no change │ │ Q6 │ 4543.09ms │ 4597.32ms │ no change │ │ Q7 │ 8012.85ms │ 9026.30ms │ 1.13x slower │ │ Q8 │ 6572.37ms │ 6049.51ms │ +1.09x faster │ │ Q9 │ 6734.63ms │ 6345.69ms │ +1.06x faster │ │ Q10 │ 9896.16ms │ 9564.17ms │ no change │ └──┴┴─┴───┘ ┏┳┓ ┃ Benchmark Summary ┃┃ ┡╇┩ │ Total Time (main) │ 61444.64ms │ │ Total Time (concat_batches_for_sort) │ 59354.33ms │ │ Average Time (main)│ 6144.46ms │ │ Average Time (concat_batches_for_sort) │ 5935.43ms │ │ Queries Faster │ 5 │ │ Queries Slower │ 1 │ │ Queries with No Change │ 4 │ └┴┘ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Perf: Support automatically concat_batches for sort which will improve performance [datafusion]
zhuqi-lucas commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2040673419
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -645,7 +645,36 @@ impl ExternalSorter {
return self.sort_batch_stream(batch, metrics, reservation);
}
-let streams = std::mem::take(&mut self.in_mem_batches)
+let mut merged_batches = Vec::new();
+let mut current_batches = Vec::new();
+let mut current_size = 0;
+
+for batch in std::mem::take(&mut self.in_mem_batches) {
Review Comment:
> I think it would be nice to use `pop` (`while let Some(batch) = v.pop`)
here to remove the batch from the vec once sorted to reduce memory usage. Now
the batch is AFAIK retained until after the loop.
Thank you @Dandandan for review and good suggestion, addressed your
suggestion!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
