c21 commented on pull request #35552:
URL: https://github.com/apache/spark/pull/35552#issuecomment-1044101219


   Thanks @HeartSaVioR for the update!
   
   > There is a much simpler case: suppose table is hash partitioned by k1, 
with a small number of partitions and the data is skewed. If we run GROUP BY 
k1, k2 against the table, Spark doesn't add a shuffle (expected) and the query 
will run super slowly.
   
   By `table is hash partitioned by k1`, are you referring to table bucketed on 
`k1`, or some other scenario? I can understand the query can get arbitrarily 
complicated depending on the business logic from individual's environment, but 
would like to dig more into it. If data skew happens in bucketed table, I think 
we have couple of options:
   
   * Data skew in bucketed table, not only will cause slowness in aggregate, 
but also for all other operators, such as shuffle join. I think we'd better fix 
the issue from the root. i.e. table scan operator. Table scan operator can 
determine whether to set its `outputPartitioning` as 
`HashPartitioning(bucketed_keys)` or not, based on some statistics. One option 
is to check the bucket file size, and optionally row count per file if 
available during query planning time. If file size/row count has skewness 
(certain bucket file is too large/having too many rows), do not populate 
`outputPartitioning`, i.e. not passing bucketing output partitioning down 
through the query plan. This will not need the `requiredChildDistribution` 
change for aggregate operator, and fix issues for other operators like join as 
well.
   * Data skew in the middle of plan. Then we can use some AQE strategy to 
handle it. For example, if the shuffle before aggregate on subset of keys have 
data skewness (i.e. `shuffle(x)` before `aggregate(x, y)`). We can add an AQE 
rule to dynamically add a `shuffle(x, y)` after `shuffle(x)`, but before 
`aggregate(x, y)`, based on runtime shuffle size of `shuffle(x)`. This also 
solved the issue, and has benefit to auto-tune the query adaptively, without 
manually setting the config per query/job (manually tune configs per query is 
not scalable, right?).
   
   > We seem to consider this as a "trade off", but it is not going to be 
acceptable if the elapsed times of the query are from mins to hours. We are not 
yet very smart about choosing the best behavior automatically for specific 
query, so we would like to provide a new config to end users to tune the 
behavior manually.
   
   I double check the query plan you added in unit test:
   
   ```scala
   val grouped = df
           // repartition by sub group keys which satisfies 
ClusteredDistribution(group keys)
           .repartition($"key1")
           .groupBy($"key1", $"key2")
           .agg(sum($"value"))
   ```
   
   Enable the newly added config in this PR:
   
   ```
   *(3) HashAggregate(keys=[key1#10, key2#11], functions=[sum(value#12)], 
output=[key1#10, key2#11, sum(value)#20L])
   +- Exchange hashpartitioning(key1#10, key2#11, 5), ENSURE_REQUIREMENTS, 
[id=#25]
      +- *(2) HashAggregate(keys=[key1#10, key2#11], 
functions=[partial_sum(value#12)], output=[key1#10, key2#11, sum#25L])
         +- Exchange hashpartitioning(key1#10, 5), REPARTITION_BY_COL, [id=#21]
            +- *(1) Project [_1#3 AS key1#10, _2#4 AS key2#11, _3#5 AS value#12]
               +- *(1) LocalTableScan [_1#3, _2#4, _3#5]
   ```
   
   Disable the newly added config in this PR:
   
   ```
   *(2) HashAggregate(keys=[key1#10, key2#11], functions=[sum(value#12)], 
output=[key1#10, key2#11, sum(value)#20L])
   +- *(2) HashAggregate(keys=[key1#10, key2#11], 
functions=[partial_sum(value#12)], output=[key1#10, key2#11, sum#25L])
      +- Exchange hashpartitioning(key1#10, 5), REPARTITION_BY_COL, [id=#19]
         +- *(1) Project [_1#3 AS key1#10, _2#4 AS key2#11, _3#5 AS value#12]
            +- *(1) LocalTableScan [_1#3, _2#4, _3#5]
   ```
   
   So the partial aggregate `HashAggregate(keys=[key1#10, key2#11], 
functions=[partial_sum(value#12)] ...` is always there with enabling/disabling 
the config. I doubt if this feature can help the performance of data skew, as 
you can see we have to do the partial aggregate anyway on the skewed partition. 
The partial aggregate will be the major cost here. Wondering could you share us 
with a concrete aggregate query example with data skew?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to