Github user sddyljsx commented on the issue:
https://github.com/apache/spark/pull/21859
@felixcheung
Thanks for review.
**1. How small is 'small':**
This optimazition works when the sampled data of the RangePartitioner
covers all the data to sort.
The size of the sampled data is :
```
// Cast to double to avoid overflowing ints or longs
val sampleSize = math.min(samplePointsPerPartitionHint.toDouble *
partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a
little bit.
val sampleSizePerPartition = math.ceil(3.0 * sampleSize /
rdd.partitions.length).toInt
```
```
val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions")
.doc("The default number of partitions to use when shuffling data for
joins or aggregations.")
.intConf
.createWithDefault(200)
```
```
val RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION =
buildConf("spark.sql.execution.rangeExchange.sampleSizePerPartition")
.internal()
.doc("Number of points to sample per partition in order to determine
the range boundaries" +
" for range partitioning, typically used in global sorting
(without limit).")
.intConf
.createWithDefault(100)
```
The default value of the SHUFFLE_PARTITIONS is 200, and the default value
of the RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION is 100, so the default size of
the total sampled data is 200x100x3 = 60,000.
**So when the size of the data to sort in each partition is less than
60,000/partitions.length, this optimization works well.**
In my case, 'select * from order where order_status = 9 order by
order_id', the original dataset has 10 partitions and the total size is
20,808,930 , but after filtering 'order_status = 9', the total remaining size
is only 18,610 (1,861 on each partition). The size of the sampled data in each
partition is 60,000 / 10 = 6,000. 6,000 is larger than 1,861, so the sampled
data will cover all the data to sort.
The original logic will execute the filescan twice, the first for
calculating the RangePartitioner's rangeBounds info by sampling the data, and
the second for getting the whole data, But in this case, we have got the whole
data in the first filescan, so **there is no need to do the second one**. This
is the purpose of optimization.
**2. Tests**
It can be tested by executing sql like 'select * from a order by a.b'
on a dataset with a small size.
Unit Test has been added in SQLQuerySuite.
**3. Benchmark**
This optimazition works well when the original dataset is large, but the
data to sort is small after filtering. Compared to the original logic, It will
save the time on the second filescan and filter theoretically.
Benchmark has been added in SmallDataSortBenchmark.
The json dataset has 100,000,000 rows, the schema is (key, value). The
key is in range (0, 100,000), each key has values (0, 1000), so the sql 'select
* from src where key = $key order by value' will get 1000 rows.
before optimization:
```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_91-b14 on Mac OS X 10.13.6
Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
speed up sort when the dataset is small: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
------------------------------------------------------------------------------------------------
sort 179881 / 182216 0.6
1798.8 1.0X
```
after optimization:
```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_91-b14 on Mac OS X 10.13.6
Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
speed up sort when the dataset is small: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
------------------------------------------------------------------------------------------------
sort 127424 / 132435 0.8
1274.2 1.0X
```
the sql speeds ââup by 30% with this optimization.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]