Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]

2025-06-18 Thread via GitHub


mbutrovich commented on PR #1862:
URL: 
https://github.com/apache/datafusion-comet/pull/1862#issuecomment-2985161552

   @Kontinuation thank you for bringing this up! Let me investigate. In the 
meantime I suspect we'll revert this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]

2025-06-18 Thread via GitHub


Kontinuation commented on PR #1862:
URL: 
https://github.com/apache/datafusion-comet/pull/1862#issuecomment-2985113515

   This implementation of RangePartitioning may be incorrect. RangePartitioning 
should partition the input DataFrame into partitions with consecutive and 
non-overlapping ranges, this requires scanning the entire DataFrame to obtain 
the ranges of each partition before performing the actual shuffle writing.
   
   Here is the PySpark code to illustrate the difference between the behavior 
of Comet and Vanilla Spark.
   
   ```python
   spark.range(0, 
10).write.format("parquet").mode("overwrite").save("range-partitioning")
   
   df = spark.read.parquet("range-partitioning")
   df_range_partitioned = df.repartitionByRange(10, "id")
   
   df_range_partitioned.explain()
   
   # Show the min and max of each range
   def get_partition_bounds(idx, iterator):
   min = None
   max = None
   for row in iterator:
   if min is None or row.id < min:
   min = row.id
   if max is None or row.id > max:
   max = row.id
   yield idx, min, max
   
   partition_bounds = 
df_range_partitioned.rdd.mapPartitionsWithIndex(get_partition_bounds).collect()
   
   # Print the results
   for partition_id, min_id, max_id in sorted(partition_bounds):
   print(f"Partition {partition_id}: min_id={min_id}, max_id={max_id}")
   ```
   
   **Comet**:
   
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- CometExchange rangepartitioning(id#17L ASC NULLS FIRST, 10), 
REPARTITION_BY_NUM, CometNativeShuffle, [plan_id=173]
  +- CometScan parquet [id#17L] Batched: true, DataFilters: [], Format: 
CometParquet, Location: InMemoryFileIndex(1 paths)[file:/path/to/range-p..., 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct
   
   Partition 0: min_id=0, max_id=90799
   Partition 1: min_id=753, max_id=91680
   Partition 2: min_id=1527, max_id=92520
   Partition 3: min_id=2399, max_id=93284
   Partition 4: min_id=3274, max_id=94123
   Partition 5: min_id=4053, max_id=94844
   Partition 6: min_id=4851, max_id=95671
   Partition 7: min_id=5738, max_id=96522
   Partition 8: min_id=6571, max_id=97335
   Partition 9: min_id=7408, max_id=9
   ```
   
   **Spark**:
   
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- Exchange rangepartitioning(id#20L ASC NULLS FIRST, 10), 
REPARTITION_BY_NUM, [plan_id=197]
  +- FileScan parquet [id#20L] Batched: true, DataFilters: [], Format: 
Parquet, Location: InMemoryFileIndex(1 paths)[file:/path/to/range-p..., 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct
   
   Partition 0: min_id=0, max_id=9974
   Partition 1: min_id=9975, max_id=19981
   Partition 2: min_id=19982, max_id=29993
   Partition 3: min_id=29994, max_id=39997
   Partition 4: min_id=39998, max_id=49959
   Partition 5: min_id=49960, max_id=59995
   Partition 6: min_id=59996, max_id=69898
   Partition 7: min_id=69899, max_id=79970
   Partition 8: min_id=79971, max_id=89976
   Partition 9: min_id=89977, max_id=9
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]

2025-06-17 Thread via GitHub


mbutrovich merged PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]

2025-06-14 Thread via GitHub


mbutrovich commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2146947020


##
common/src/main/scala/org/apache/comet/CometConf.scala:
##
@@ -307,6 +307,18 @@ object CometConf extends ShimCometConf {
   .booleanConf
   .createWithDefault(false)
 
+  val COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED: ConfigEntry[Boolean] =

Review Comment:
   That's basically every unit test already (including the updated native 
shuffle suite and fuzz test).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]

2025-06-13 Thread via GitHub


parthchandra commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2146243533


##
common/src/main/scala/org/apache/comet/CometConf.scala:
##
@@ -307,6 +307,18 @@ object CometConf extends ShimCometConf {
   .booleanConf
   .createWithDefault(false)
 
+  val COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED: ConfigEntry[Boolean] =

Review Comment:
   That's what I thought. Is there a way to add a unit test with both enabled?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]

2025-06-13 Thread via GitHub


mbutrovich commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2146241932


##
common/src/main/scala/org/apache/comet/CometConf.scala:
##
@@ -307,6 +307,18 @@ object CometConf extends ShimCometConf {
   .booleanConf
   .createWithDefault(false)
 
+  val COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED: ConfigEntry[Boolean] =

Review Comment:
   The default is both enabled. They individually control whether hash or range 
partitioning falls back, respectively.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]

2025-06-13 Thread via GitHub


parthchandra commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2146236444


##
spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala:
##
@@ -120,29 +120,51 @@ class CometNativeShuffleSuite extends CometTestBase with 
AdaptiveSparkPlanHelper
 }
   }
 
-  test("native operator after native shuffle") {
-withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
-  val df = sql("SELECT * FROM tbl")
-
-  val shuffled1 = df
-.repartition(10, $"_2")
-.select($"_1", $"_1" + 1, $"_2" + 2)
-.repartition(10, $"_1")
-.filter($"_1" > 1)
-
-  // 2 Comet shuffle exchanges are expected
-  checkShuffleAnswer(shuffled1, 2)
-
-  val shuffled2 = df
-.repartitionByRange(10, $"_2")
-.select($"_1", $"_1" + 1, $"_2" + 2)
-.repartition(10, $"_1")
-.filter($"_1" > 1)
-
-  // Because the first exchange from the bottom is range exchange which 
native shuffle
-  // doesn't support. So Comet exec operators stop before the first 
exchange and thus
-  // there is no Comet exchange.
-  checkShuffleAnswer(shuffled2, 0)
+  test("native operator after native shuffle with hash partitioning") {
+Seq("true", "false").foreach { hashPartitioningEnabled =>
+  withSQLConf(
+CometConf.COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED.key -> 
hashPartitioningEnabled) {

Review Comment:
   We could probably merge these two tests ? 
   ```
   Seq("true", "false").foreach { partitioningEnabled =>
Seq(CometConf.COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED, 
CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED) { 
partitioningType =>
 withSQLConf(
  partittioningType.key -> partitioningEnabled)
```



##
common/src/main/scala/org/apache/comet/CometConf.scala:
##
@@ -307,6 +307,18 @@ object CometConf extends ShimCometConf {
   .booleanConf
   .createWithDefault(false)
 
+  val COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED: ConfigEntry[Boolean] =

Review Comment:
   Can a user have both configs enabled? What happens?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]

2025-06-13 Thread via GitHub


andygrove commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2146006687


##
dev/diffs/3.4.3.diff:
##
@@ -2404,7 +2411,31 @@ index 266bb343526..c3e3d155813 100644
checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
  }
}
-@@ -1026,15 +1057,23 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils with Adapti
+@@ -895,6 +928,7 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+ 
+   test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") 
{
+ withSQLConf(
++  CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> 
"false",

Review Comment:
   Could you add comments where this is disabled, explaining why we must do 
this? I know this is needed, but I will likely forget why.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]

2025-06-13 Thread via GitHub


mbutrovich commented on PR #1862:
URL: 
https://github.com/apache/datafusion-comet/pull/1862#issuecomment-2970529921

   Looking at the 3 Spark SQL test failures (all related to bucket scan) now 
that there are fewer 3.5.x diffs to update.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]

2025-06-11 Thread via GitHub


mbutrovich commented on PR #1862:
URL: 
https://github.com/apache/datafusion-comet/pull/1862#issuecomment-2964065141

   Last thing I am waiting on is to do a new set of Spark diffs to turn off 
native RangePartitioning in the 3 bucketing-related tests. Because of the 
different random number generator (for sampling) we get different results. I'll 
do that after #1870 and #1873 merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]

2025-06-11 Thread via GitHub


mbutrovich commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2139997704


##
spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala:
##
@@ -2904,6 +2903,8 @@ object QueryPlanSerde extends Logging with CometExprShim {
 supported
   case SinglePartition =>
 inputs.forall(attr => supportedShuffleDataType(attr.dataType))
+  case RangePartitioning(_, _) =>
+true

Review Comment:
   I had intended to write fallback rules there, but haven't found a scenario 
to add to that yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]

2025-06-10 Thread via GitHub


andygrove commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2138821947


##
native/core/src/execution/shuffle/range_partitioner.rs:
##
@@ -0,0 +1,432 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{ArrayRef, UInt64Array};
+use arrow::compute::{take_arrays, TakeOptions};
+use arrow::row::{Row, RowConverter, Rows, SortField};
+use datafusion::physical_expr::LexOrdering;
+use rand::{rngs::SmallRng, Rng, SeedableRng};
+
+pub struct RangePartitioner;
+
+impl RangePartitioner {
+/// Given a number of rows, sample size, and a random seed, generates 
unique indices to take
+/// from an input batch to act as a random sample.
+/// Adapted from 
https://en.wikipedia.org/wiki/Reservoir_sampling#Optimal:_Algorithm_L
+/// We use sample_size instead of k and num_rows instead of n.
+/// We use indices instead of actual values in the reservoir  since we'll 
do one take() on the
+/// input arrays at the end.
+pub fn reservoir_sample_indices(num_rows: usize, sample_size: usize, seed: 
u64) -> Vec {
+assert!(sample_size > 0);
+assert!(
+num_rows > sample_size,
+"Sample size > num_rows yields original batch."
+);
+
+// Initialize our reservoir with indices of the first |sample_size| 
elements.
+let mut reservoir: Vec = (0..sample_size as u64).collect();
+
+let mut rng = SmallRng::seed_from_u64(seed);
+let mut w = (rng.random::().ln() / sample_size as f64).exp();
+let mut i = sample_size - 1;
+
+while i < num_rows {
+i += (rng.random::().ln() / (1.0 - w).ln()).floor() as usize 
+ 1;
+
+if i < num_rows {
+// Replace a random item in the reservoir with i
+let random_index = rng.random_range(0..sample_size);
+reservoir[random_index] = i as u64;
+w *= (rng.random::().ln() / sample_size as f64).exp();
+}
+}
+
+reservoir
+}
+
+/// Given a batch of Rows, an ordered vector of Rows that represent 
partition boundaries, and
+/// a slice with enough space for the input batch, determines a partition 
id for every input
+/// Row using binary search.
+pub fn partition_indices_for_batch(
+row_batch: &Rows,
+partition_bounds_vec: &Vec,
+partition_ids: &mut [u32],
+) {
+row_batch.iter().enumerate().for_each(|(row_idx, row)| {
+partition_ids[row_idx] =
+partition_bounds_vec.partition_point(|bound| *bound <= row) as 
u32
+});
+}
+
+/// Given input arrays and range partitioning metadata: samples the input 
arrays, generates
+/// partition bounds, and returns Rows (for comparison against) and a 
RowConverter (for
+/// adapting future incoming batches).
+pub fn generate_bounds(
+partition_arrays: &Vec,
+lex_ordering: &LexOrdering,
+num_output_partitions: usize,
+num_rows: usize,
+sample_size: usize,
+seed: u64,
+) -> (Rows, RowConverter) {
+let sampled_columns = if sample_size < num_rows {
+// Construct our sample indices.
+let sample_indices = 
UInt64Array::from(RangePartitioner::reservoir_sample_indices(
+num_rows,
+sample_size,
+seed,
+));
+
+// Extract our sampled data from the input data.
+take_arrays(
+partition_arrays,
+&sample_indices,
+Some(TakeOptions {
+check_bounds: false,
+}),
+)
+.unwrap()
+} else {
+// Requested sample_size is larger than the batch, so just use the 
batch.
+partition_arrays.clone()
+};
+
+// Generate our bounds indices.
+let sort_fields: Vec = partition_arrays
+.iter()
+.zip(lex_ordering)
+.map(|(array, sort_expr)| {
+SortField::new_with_options(array.data_type().clone(), 
sort_expr.options)
+})
+.collect();
+
+let (bo

Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]

2025-06-10 Thread via GitHub


andygrove commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2138822721


##
spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala:
##
@@ -2904,6 +2903,8 @@ object QueryPlanSerde extends Logging with CometExprShim {
 supported
   case SinglePartition =>
 inputs.forall(attr => supportedShuffleDataType(attr.dataType))
+  case RangePartitioning(_, _) =>
+true

Review Comment:
   This looks like a placeholder for additional checks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]

2025-06-10 Thread via GitHub


andygrove commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2138821450


##
native/core/src/execution/shuffle/range_partitioner.rs:
##
@@ -0,0 +1,432 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{ArrayRef, UInt64Array};
+use arrow::compute::{take_arrays, TakeOptions};
+use arrow::row::{Row, RowConverter, Rows, SortField};
+use datafusion::physical_expr::LexOrdering;
+use rand::{rngs::SmallRng, Rng, SeedableRng};
+
+pub struct RangePartitioner;
+
+impl RangePartitioner {
+/// Given a number of rows, sample size, and a random seed, generates 
unique indices to take
+/// from an input batch to act as a random sample.
+/// Adapted from 
https://en.wikipedia.org/wiki/Reservoir_sampling#Optimal:_Algorithm_L
+/// We use sample_size instead of k and num_rows instead of n.
+/// We use indices instead of actual values in the reservoir  since we'll 
do one take() on the
+/// input arrays at the end.
+pub fn reservoir_sample_indices(num_rows: usize, sample_size: usize, seed: 
u64) -> Vec {
+assert!(sample_size > 0);
+assert!(
+num_rows > sample_size,
+"Sample size > num_rows yields original batch."
+);
+
+// Initialize our reservoir with indices of the first |sample_size| 
elements.
+let mut reservoir: Vec = (0..sample_size as u64).collect();
+
+let mut rng = SmallRng::seed_from_u64(seed);
+let mut w = (rng.random::().ln() / sample_size as f64).exp();
+let mut i = sample_size - 1;
+
+while i < num_rows {
+i += (rng.random::().ln() / (1.0 - w).ln()).floor() as usize 
+ 1;
+
+if i < num_rows {
+// Replace a random item in the reservoir with i
+let random_index = rng.random_range(0..sample_size);
+reservoir[random_index] = i as u64;
+w *= (rng.random::().ln() / sample_size as f64).exp();
+}
+}
+
+reservoir
+}
+
+/// Given a batch of Rows, an ordered vector of Rows that represent 
partition boundaries, and
+/// a slice with enough space for the input batch, determines a partition 
id for every input
+/// Row using binary search.
+pub fn partition_indices_for_batch(
+row_batch: &Rows,
+partition_bounds_vec: &Vec,
+partition_ids: &mut [u32],
+) {
+row_batch.iter().enumerate().for_each(|(row_idx, row)| {
+partition_ids[row_idx] =
+partition_bounds_vec.partition_point(|bound| *bound <= row) as 
u32
+});
+}
+
+/// Given input arrays and range partitioning metadata: samples the input 
arrays, generates
+/// partition bounds, and returns Rows (for comparison against) and a 
RowConverter (for
+/// adapting future incoming batches).
+pub fn generate_bounds(
+partition_arrays: &Vec,
+lex_ordering: &LexOrdering,
+num_output_partitions: usize,
+num_rows: usize,
+sample_size: usize,
+seed: u64,
+) -> (Rows, RowConverter) {

Review Comment:
   We should return a `Result` here and replace the unwraps with `?` in this 
function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]

2025-06-10 Thread via GitHub


mbutrovich commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2138343547


##
native/core/benches/shuffle_writer.rs:
##
@@ -66,10 +67,40 @@ fn criterion_benchmark(c: &mut Criterion) {
 CompressionCodec::Zstd(6),
 ] {
 group.bench_function(
-format!("shuffle_writer: end to end (compression = 
{compression_codec:?}"),
+format!("shuffle_writer: end to end (compression = 
{compression_codec:?})"),
+|b| {
+let ctx = SessionContext::new();
+let exec = create_shuffle_writer_exec(
+compression_codec.clone(),
+CometPartitioning::Hash(vec![Arc::new(Column::new("a", 
0))], 16),
+);
+b.iter(|| {
+let task_ctx = ctx.task_ctx();
+let stream = exec.execute(0, task_ctx).unwrap();
+let rt = Runtime::new().unwrap();
+rt.block_on(collect(stream)).unwrap();
+});
+},
+);
+}
+
+for partitioning in [
+CometPartitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16),
+CometPartitioning::RangePartitioning(

Review Comment:
   Added RangePartitioning benchmark here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]

2025-06-09 Thread via GitHub


andygrove commented on PR #1862:
URL: 
https://github.com/apache/datafusion-comet/pull/1862#issuecomment-2957023096

   I ran fresh benchmarks, but I do not see any change in performance. Perhaps 
the range partitioning shuffles are not a significant cost in these benchmarks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]

2025-06-09 Thread via GitHub


mbutrovich commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2135992681


##
native/core/benches/shuffle_writer.rs:
##
@@ -42,20 +45,18 @@ fn criterion_benchmark(c: &mut Criterion) {
 CompressionCodec::Zstd(1),
 CompressionCodec::Zstd(6),
 ] {
-for enable_fast_encoding in [true, false] {

Review Comment:
   Remove this `enable_fast_encoding` loop since we don't have that flag 
anymore after #1703.



##
native/core/benches/shuffle_writer.rs:
##
@@ -42,20 +45,18 @@ fn criterion_benchmark(c: &mut Criterion) {
 CompressionCodec::Zstd(1),
 CompressionCodec::Zstd(6),
 ] {
-for enable_fast_encoding in [true, false] {

Review Comment:
   Removed this `enable_fast_encoding` loop since we don't have that flag 
anymore after #1703.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]

2025-06-08 Thread via GitHub


andygrove commented on PR #1862:
URL: 
https://github.com/apache/datafusion-comet/pull/1862#issuecomment-2954116663

   It would be interesting to use our new tracing feature to compare on-heap vs 
off-heap memory usage with range partitioning supported natively versus falling 
back to Spark.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]

2025-06-07 Thread via GitHub


mbutrovich commented on PR #1862:
URL: 
https://github.com/apache/datafusion-comet/pull/1862#issuecomment-2952887778

   > I ran TPC-H benchmarks and saw shuffles with range partitioning run 
natively. I did not see any difference in performance compared to the last set 
of benchmarks I ran some time ago, but I have not compared to the main branch 
yet.
   
   Thanks Andy. I'm doing some pretty inefficient stuff to get around ownership 
issues of `Rows`, `Vec`, `Vec`, etc. that I aim to improve, so 
performance should improve. The microbenchmark for shuffle_writer shows range 
partitioning taking almost twice as long as hash partitioning on my laptop at 
the moment, but I aim to fix that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]

2025-06-07 Thread via GitHub


andygrove commented on PR #1862:
URL: 
https://github.com/apache/datafusion-comet/pull/1862#issuecomment-2952814148

   I ran TPC-H benchmarks and saw shuffles with range partitioning run 
natively. I did not see any difference in performance compared to the last set 
of benchmarks I ran some time ago, but I have not compared to the main branch 
yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: support RangePartitioning with native shuffle [datafusion-comet]

2025-06-07 Thread via GitHub


codecov-commenter commented on PR #1862:
URL: 
https://github.com/apache/datafusion-comet/pull/1862#issuecomment-2952754584

   ## 
[Codecov](https://app.codecov.io/gh/apache/datafusion-comet/pull/1862?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 Report
   Attention: Patch coverage is `64.70588%` with `6 lines` in your changes 
missing coverage. Please review.
   > Project coverage is 32.15%. Comparing base 
[(`f09f8af`)](https://app.codecov.io/gh/apache/datafusion-comet/commit/f09f8af64c6599255e116a376f4f008f2fd63b43?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 to head 
[(`45cafd2`)](https://app.codecov.io/gh/apache/datafusion-comet/commit/45cafd20dbca6cdff0da8e666112edd9e85b64c1?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   > Report is 247 commits behind head on main.
   
   | [Files with missing 
lines](https://app.codecov.io/gh/apache/datafusion-comet/pull/1862?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Patch % | Lines |
   |---|---|---|
   | 
[.../scala/org/apache/comet/serde/QueryPlanSerde.scala](https://app.codecov.io/gh/apache/datafusion-comet/pull/1862?src=pr&el=tree&filepath=spark%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fcomet%2Fserde%2FQueryPlanSerde.scala&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9jb21ldC9zZXJkZS9RdWVyeVBsYW5TZXJkZS5zY2FsYQ==)
 | 0.00% | [1 Missing and 2 partials :warning: 
](https://app.codecov.io/gh/apache/datafusion-comet/pull/1862?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   | 
[...t/execution/shuffle/CometNativeShuffleWriter.scala](https://app.codecov.io/gh/apache/datafusion-comet/pull/1862?src=pr&el=tree&filepath=spark%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fspark%2Fsql%2Fcomet%2Fexecution%2Fshuffle%2FCometNativeShuffleWriter.scala&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c3Bhcmsvc3JjL21haW4vc2NhbGEvb3JnL2FwYWNoZS9zcGFyay9zcWwvY29tZXQvZXhlY3V0aW9uL3NodWZmbGUvQ29tZXROYXRpdmVTaHVmZmxlV3JpdGVyLnNjYWxh)
 | 78.57% | [2 Missing and 1 partial :warning: 
](https://app.codecov.io/gh/apache/datafusion-comet/pull/1862?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   
   Additional details and impacted files
   
   
   ```diff
   @@  Coverage Diff  @@
   ##   main#1862   +/-   ##
   =
   - Coverage 56.12%   32.15%   -23.98% 
   + Complexity  976  653  -323 
   =
 Files   119  130   +11 
 Lines 1174312674  +931 
 Branches   2251 2362  +111 
   =
   - Hits   6591 4075 -2516 
   - Misses 4012 7804 +3792 
   + Partials   1140  795  -345 
   ```
   
   
   
   [:umbrella: View full report in Codecov by 
Sentry](https://app.codecov.io/gh/apache/datafusion-comet/pull/1862?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   
   :loudspeaker: Have feedback on the report? [Share it 
here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   
:rocket: New features to boost your workflow: 
   
   - :snowflake: [Test 
Analytics](https://docs.codecov.com/docs/test-analytics): Detect flaky tests, 
report on failures, and find test suite problems.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]