viirya commented on PR #55927:
URL: https://github.com/apache/spark/pull/55927#issuecomment-4510340054

   Thanks @sunchao for the iterations — design is in good shape and the safety 
reasoning around `<=>` / `NullType` is solid. Two follow-ups I'd suggest before 
merge, plus one optional readability nit.
   
   ### 1. Narrow `canSpreadNullJoinKeys` to the preserved side
   
   `ShuffledJoin.canSpreadNullJoinKeys` currently opts in whenever *either* 
side has nullable join keys. But the skew it targets only ever appears on the 
**preserved** side:
   
   - `LeftOuter`: only left-side NULL-keyed rows are carried through 
(right-side NULLs are filtered by `=` and never emitted).
   - `RightOuter`: symmetric, only right.
   - `FullOuter`: both sides.
   
   When only the non-preserved side has nullable keys, opting in has zero skew 
benefit but still pays:
   
   - per-row `anyNull()` branch in `getPartitionKeyExtractor` on the hot path,
   - a local sort under `sortBeforeRepartition=true` (defensive, since the 
round-robin path won't actually fire if there are no NULLs, but still paid).
   
   Note the *downstream* re-shuffle cost (the one reflected by the 
`optimizeOutRepartition` test diffs) is **not** affected here, because 
`ShuffledJoin.outputPartitioning` only exposes the preserved side's 
partitioning (`left.outputPartitioning` for `LeftOuter`, etc.). So this is 
purely a "no-benefit but small extra cost" case.
   
   Suggested tightening — keeps both sides opting in together, no change to 
shuffle-spec compatibility rules:
   
   ```scala
   private lazy val canSpreadNullJoinKeys: Boolean = {
     // The skew this targets only appears on the preserved side of an outer 
join:
     // non-preserved NULL-keyed rows are filtered out by `=` and never 
emitted, so their
     // nullability cannot create reducer skew. This also mirrors which side's
     // outputPartitioning is exposed downstream by ShuffledJoin.
     val preservedSideHasNullableKeys = joinType match {
       case LeftOuter => leftKeys.exists(_.nullable)
       case RightOuter => rightKeys.exists(_.nullable)
       case FullOuter => leftKeys.exists(_.nullable) || 
rightKeys.exists(_.nullable)
       case _ => false
     }
     conf.getConf(SQLConf.SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED) && 
preservedSideHasNullableKeys
   }
   ```
   
   I considered the more aggressive variant — letting the two sides opt in 
independently (`allowNullKeySpreading=true` only on the preserved side) — but 
that would require extending `HashShuffleSpec` ↔ `NullAwareHashShuffleSpec` 
compatibility to accept the asymmetric one-side-opted-in case, which is a much 
larger blast radius. The "both sides together, gated by preserved-side 
nullability" form above gets the practical benefit without touching 
compatibility rules.
   
   ### 2. Expand the config doc with the trade-offs
   
   The current doc only states the benefit:
   
   > "When true, Spark may spread rows with NULL equi-join keys across shuffle 
partitions for ordinary shuffled outer joins to reduce shuffle skew."
   
   A user enabling the flag may be surprised by behavior the doc doesn't 
mention:
   
   - Only `LEFT/RIGHT/FULL OUTER` equi-joins on nullable keys are affected.
   - The join's output partitioning intentionally does not satisfy a strict 
`ClusteredDistribution`, so a downstream `GROUP BY k` / `OVER (PARTITION BY k)` 
/ another `JOIN ... ON k = ...` will introduce an additional shuffle that 
today's plan would have avoided. The `AdaptiveQueryExecSuite` diffs 
(`optimizeOutRepartition: true → false`) are exactly this cost.
   - When one side is already pre-shuffled as `HashPartitioning`, only the 
other side is reshuffled as `NullAwareHashPartitioning`; the pre-shuffled side 
keeps its NULL skew.
   
   Worth folding these into the `.doc(...)` so the cost is visible at `SET` / 
`EXPLAIN` time, not only by reading the partitioning source.
   
   Also: the PR description's *"How was this patch tested?"* still says 
*"Regenerated the affected TPC-DS plan-stability outputs after the expected 
physical-plan change."* but the patch contains no golden-file changes 
(correctly, since the flag defaults off). Worth removing so future 
archeologists don't go looking.
   
   ### 3. (Optional) Document the cross-file invariants of the null-aware family
   
   The correctness of this feature rests on four invariants that are currently 
maintained in four separate places:
   
   1. **Consumer opt-in only**: only 
`ClusteredDistribution(allowNullKeySpreading = true)` may be satisfied by the 
null-aware family — enforced in `NullAwareHashPartitioning.satisfies0` / 
`CoalescedNullAwareHashPartitioning.satisfies0`.
   2. **No strict-cluster satisfaction**: the family must not satisfy 
`StatefulOpClusteredDistribution`, ordinary `ClusteredDistribution`, or 
`AllTuples` with `n > 1` — enforced by deliberately *not* delegating to 
`super.satisfies0` in `NullAwareHashPartitioning`.
   3. **Non-NULL partition-id parity with `HashPartitioning`**: for any row 
with no NULL key, `ShuffleExchangeExec`'s null-aware extractor must produce the 
same partition id as `HashPartitioning.partitionIdExpression` would. This is 
what makes a `HashShuffleSpec` input compatible with a 
`NullAwareHashShuffleSpec` input when both distributions opt in. Both sites 
currently use `Pmod(CollationAwareMurmur3Hash(keys), numPartitions)`; any 
future change to the hash on either side must be mirrored on the other.
   4. **Symmetric spec compatibility**: `HashShuffleSpec` ↔ 
`NullAwareHashShuffleSpec` are mutually compatible *iff both* distributions 
have `allowNullKeySpreading = true`.
   
   Invariant (3) is the one that's currently fully implicit — there is no 
comment or test that ties the two hash sites together, but breaking the parity 
would silently misalign non-NULL rows from a `HashPartitioning` input against a 
`NullAwareHashPartitioning` input under the "compatible" rule (4). Worth at 
least a one-line comment at the `CollationAwareMurmur3Hash` site in 
`ShuffleExchangeExec` pointing back to 
`HashPartitioning.partitionIdExpression`, and optionally a contract block on 
`NullAwareHashPartitioning` listing the four invariants. I'd be ok with just 
the one-line cross-reference if a full contract block feels too heavy for this 
codebase's conventions.
   
   Items 1 and 2 are merge-blockers in my view (small but user-observable); 
item 3 is a maintainability nit you can take or leave.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to