sunchao commented on code in PR #55927:
URL: https://github.com/apache/spark/pull/55927#discussion_r3260099787


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala:
##########
@@ -403,6 +410,24 @@ object ShuffleExchangeExec {
       case h: HashPartitioning =>
         val projection = UnsafeProjection.create(h.partitionIdExpression :: 
Nil, outputAttributes)
         row => projection(row).getInt(0)
+      case h: NullAwareHashPartitioning =>
+        val partitionIdProjection =
+          UnsafeProjection.create(h.partitionIdExpression :: Nil, 
outputAttributes)
+        val joinKeyProjection = UnsafeProjection.create(h.expressions, 
outputAttributes)
+        var nullKeyPartition =
+          new 
XORShiftRandom(TaskContext.get().partitionId()).nextInt(h.numPartitions)

Review Comment:
   Good question. `partitionId() % numPartitions` would be deterministic and 
can be balanced when all map partitions contribute similarly, but it can 
correlate badly with the subset of upstream partitions that actually contain 
null keys. For example, if only partitions 0, 8, 16, ... contain nulls and 
there are 8 reducers, that approach will always start them at reducer 0. 
Instead, `XORShiftRandom(partitionId).nextInt(numPartitions)` avoids that 
pattern and distributes the null rows more evenly across reducers.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to