peter-toth commented on code in PR #55927:
URL: https://github.com/apache/spark/pull/55927#discussion_r3265009056


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -345,6 +390,42 @@ case class CoalescedHashPartitioning(from: 
HashPartitioning, partitions: Seq[Coa
     copy(from = from.copy(expressions = newChildren))
 }
 
+case class CoalescedNullAwareHashPartitioning(
+    from: NullAwareHashPartitioning,
+    partitions: Seq[CoalescedBoundary]) extends HashPartitioningLike {
+
+  override def expressions: Seq[Expression] = from.expressions
+
+  override def satisfies0(required: Distribution): Boolean = {

Review Comment:
   This body is identical to `NullAwareHashPartitioning.satisfies0` at line 340 
— same outer `UnspecifiedDistribution`/`AllTuples`/`_ => false` match, same 
`ClusteredDistribution` inner match guarded on `allowNullKeySpreading`, same 
`requireAllClusterKeys` branching. This is the same kind of duplication 
addressed elsewhere in this PR by extracting 
`HashShuffleSpecCompatibility.isCompatible` (lines 944-955).
   
   Two cleaner shapes:
   - Lift the inner block to a private helper, e.g. `private def 
nullAwareSatisfies0(exprs, n, required)` shared by both classes.
   - Or just delegate: since boundaries don't change satisfaction semantics for 
the `allowNullKeySpreading` contract, 
`CoalescedNullAwareHashPartitioning.satisfies0(required)` is essentially 
`from.satisfies0(required)` *except* for the `AllTuples` case where 
`numPartitions` differs — that single divergence is easy to handle inline.
   
   Side note: both overrides skip the `StatefulOpClusteredDistribution` case 
that `HashPartitioningLike.satisfies0` handles. Currently unreachable 
(streaming joins use `StatefulOpClusteredDistribution`, not 
`ClusteredDistribution`, so they never opt into `allowNullKeySpreading`), but a 
one-line comment that the omission is deliberate would help the next reader.
   



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledJoin.scala:
##########
@@ -28,6 +29,15 @@ import 
org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Dist
 trait ShuffledJoin extends JoinCodegenSupport {
   def isSkewJoin: Boolean
 
+  private lazy val canSpreadNullJoinKeys: Boolean = {

Review Comment:
   The gate opts in based on join type alone, ignoring whether the shuffle keys 
are actually nullable. For an outer join on non-nullable keys (e.g. `f.k = d.k` 
where both `k` are NOT NULL — common after a NOT NULL filter or on 
schema-non-null columns), the new path:
   
   1. Adds a per-row `joinKeys.anyNull()` check in 
`ShuffleExchangeExec.getPartitionKeyExtractor` that always returns false.
   2. Produces `NullAwareHashPartitioning` as the join's output partitioning, 
which doesn't satisfy ordinary `ClusteredDistribution`. The 
`AdaptiveQueryExecSuite` diff in this PR (`optimizeOutRepartition = false` 
cases around lines 2079-2127) shows the cost — a downstream 
`df.repartition($"b")` is no longer collapsed even though the underlying 
NULL-skew problem can never have existed.
   
   Two options worth considering:
   - Gate also on `leftKeys.exists(_.nullable) || rightKeys.exists(_.nullable)` 
so a non-nullable-key outer join falls back to plain `HashPartitioning`.
   - If the simpler shape is preferred, add a sentence to the lazy val's 
comment explicitly calling out the trade-off (skew reduction vs. potentially 
unnecessary downstream re-shuffle / lost `optimizeOutRepartition`) so future 
readers don't read it as an oversight.
   



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala:
##########
@@ -59,6 +59,27 @@ class ExchangeSuite extends SharedSparkSession {
     )
   }
 
+  test("null-aware hash shuffle spreads identical NULL keys from one mapper") {
+    val input = 
Seq.fill(64)(Tuple1(null.asInstanceOf[Integer])).toDF("k").coalesce(1)
+    val plan = input.queryExecution.executedPlan
+    val exchange = ShuffleExchangeExec(NullAwareHashPartitioning(plan.output, 
4), plan)
+    val partitionSizes = exchange.execute().collectPartitions().map(_.length)
+
+    assert(partitionSizes.sorted === Array(16, 16, 16, 16))
+  }
+
+  test("null-aware hash shuffle preserves retry determinism with local 
sorting") {
+    withSQLConf(SQLConf.SORT_BEFORE_REPARTITION.key -> "true") {

Review Comment:
   Retry-determinism for the new partitioning has two paths and only one is 
covered: with `SORT_BEFORE_REPARTITION = true` the local sort makes row order 
deterministic; with `false`, the code in `ShuffleExchangeExec` (line ~497-498) 
instead relies on `isOrderSensitive = true` to propagate the parent's 
determinism level. Only the sorted half is exercised here.
   
   A second test running the same 
`ShuffleExchangeExec(NullAwareHashPartitioning, ...)` with 
`SORT_BEFORE_REPARTITION = false` and asserting that `outputDeterministicLevel` 
inherits the parent's level would catch a future regression that drops 
`isNullAwareHashPartitioning` from the `isOrderSensitive` clause (which would 
silently make retries unsafe for the unsorted path). Mirroring the structure of 
the existing test is enough — no new infrastructure required.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to