c21 commented on pull request #35552:
URL: https://github.com/apache/spark/pull/35552#issuecomment-1044101219
Thanks @HeartSaVioR for the update!
> There is a much simpler case: suppose table is hash partitioned by k1,
with a small number of partitions and the data is skewed. If we run GROUP BY
k1, k2 against the table, Spark doesn't add a shuffle (expected) and the query
will run super slowly.
By `table is hash partitioned by k1`, are you referring to table bucketed on
`k1`, or some other scenario? I can understand the query can get arbitrarily
complicated depending on the business logic from individual's environment, but
would like to dig more into it. If data skew happens in bucketed table, I think
we have couple of options:
* Data skew in bucketed table, not only will cause slowness in aggregate,
but also for all other operators, such as shuffle join. I think we'd better fix
the issue from the root. i.e. table scan operator. Table scan operator can
determine whether to set its `outputPartitioning` as
`HashPartitioning(bucketed_keys)` or not, based on some statistics. One option
is to check the bucket file size, and optionally row count per file if
available during query planning time. If file size/row count has skewness
(certain bucket file is too large/having too many rows), do not populate
`outputPartitioning`, i.e. not passing bucketing output partitioning down
through the query plan. This will not need the `requiredChildDistribution`
change for aggregate operator, and fix issues for other operators like join as
well.
* Data skew in the middle of plan. Then we can use some AQE strategy to
handle it. For example, if the shuffle before aggregate on subset of keys have
data skewness (i.e. `shuffle(x)` before `aggregate(x, y)`). We can add an AQE
rule to dynamically add a `shuffle(x, y)` after `shuffle(x)`, but before
`aggregate(x, y)`, based on runtime shuffle size of `shuffle(x)`. This also
solved the issue, and has benefit to auto-tune the query adaptively, without
manually setting the config per query/job (manually tune configs per query is
not scalable, right?).
> We seem to consider this as a "trade off", but it is not going to be
acceptable if the elapsed times of the query are from mins to hours. We are not
yet very smart about choosing the best behavior automatically for specific
query, so we would like to provide a new config to end users to tune the
behavior manually.
I double check the query plan you added in unit test:
```scala
val grouped = df
// repartition by sub group keys which satisfies
ClusteredDistribution(group keys)
.repartition($"key1")
.groupBy($"key1", $"key2")
.agg(sum($"value"))
```
Enable the newly added config in this PR:
```
*(3) HashAggregate(keys=[key1#10, key2#11], functions=[sum(value#12)],
output=[key1#10, key2#11, sum(value)#20L])
+- Exchange hashpartitioning(key1#10, key2#11, 5), ENSURE_REQUIREMENTS,
[id=#25]
+- *(2) HashAggregate(keys=[key1#10, key2#11],
functions=[partial_sum(value#12)], output=[key1#10, key2#11, sum#25L])
+- Exchange hashpartitioning(key1#10, 5), REPARTITION_BY_COL, [id=#21]
+- *(1) Project [_1#3 AS key1#10, _2#4 AS key2#11, _3#5 AS value#12]
+- *(1) LocalTableScan [_1#3, _2#4, _3#5]
```
Disable the newly added config in this PR:
```
*(2) HashAggregate(keys=[key1#10, key2#11], functions=[sum(value#12)],
output=[key1#10, key2#11, sum(value)#20L])
+- *(2) HashAggregate(keys=[key1#10, key2#11],
functions=[partial_sum(value#12)], output=[key1#10, key2#11, sum#25L])
+- Exchange hashpartitioning(key1#10, 5), REPARTITION_BY_COL, [id=#19]
+- *(1) Project [_1#3 AS key1#10, _2#4 AS key2#11, _3#5 AS value#12]
+- *(1) LocalTableScan [_1#3, _2#4, _3#5]
```
So the partial aggregate `HashAggregate(keys=[key1#10, key2#11],
functions=[partial_sum(value#12)] ...` is always there with enabling/disabling
the config. I doubt if this feature can help the performance of data skew, as
you can see we have to do the partial aggregate anyway on the skewed partition.
The partial aggregate will be the major cost here. Wondering could you share us
with a concrete aggregate query example with data skew?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]