Hi,
Have you tried
https://spark.apache.org/docs/latest/sql-performance-tuning.html#spliting-skewed-shuffle-partitions
?
Another way of handling the skew is to split the task into multiple(2 or
more) stages involving a random salt as key in the intermediate stages.
In the above case,
val maxSalt = 20 // in my experience 2*sqrt(#numPartitions) works good
for 2 stages
val randUdf = udf({ () => Random.nextInt() % maxSalt })
df.withColumn("salt", randUdf())
.repartition(numPartitions, col("salt"),col("key1")...)
.drop("salt")
.sortWithinPartitions(col("key1")...)
.yourDfOperation(...)
.répartition(numPartitions, col("key1")...)
.sortWithinPartitions(col("key1")...)
.yourDfOperation(...)
Another optimization you could try is to perform your dataframe operation
before the initial repartition, if you have a good number of fairly random
partitions.
Raghavendra
On Sat, Aug 17, 2024 at 12:33 PM Karthick <[email protected]>
wrote:
> Hi Team,
>
> I'm using repartition and sortWithinPartitions to maintain field-based
> ordering across partitions, but I'm facing data skewness among the
> partitions. I have 96 partitions, and I'm working with 500 distinct keys.
> While reviewing the Spark UI, I noticed that a few partitions are
> underutilized while others are overutilized.
>
> This seems to be a hashing problem. Can anyone suggest a better hashing
> technique or approach to mitigate this issue?
>
> Thanks in advance for your help.
>