sunchao commented on code in PR #55927:
URL: https://github.com/apache/spark/pull/55927#discussion_r3260099787
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala:
##########
@@ -403,6 +410,24 @@ object ShuffleExchangeExec {
case h: HashPartitioning =>
val projection = UnsafeProjection.create(h.partitionIdExpression ::
Nil, outputAttributes)
row => projection(row).getInt(0)
+ case h: NullAwareHashPartitioning =>
+ val partitionIdProjection =
+ UnsafeProjection.create(h.partitionIdExpression :: Nil,
outputAttributes)
+ val joinKeyProjection = UnsafeProjection.create(h.expressions,
outputAttributes)
+ var nullKeyPartition =
+ new
XORShiftRandom(TaskContext.get().partitionId()).nextInt(h.numPartitions)
Review Comment:
Good question. `partitionId() % numPartitions` would be deterministic and
can be balanced when all map partitions contribute similarly, but it can
correlate badly with the subset of upstream partitions that actually contain
null keys. For example, if only partitions 0, 8, 16, ... contain nulls and
there are 8 reducers, that approach will always start them at reducer 0.
Instead, `XORShiftRandom(partitionId).nextInt(numPartitions)` avoids that
pattern and distributes the null rows more evenly across reducers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]