cloud-fan commented on code in PR #55912:
URL: https://github.com/apache/spark/pull/55912#discussion_r3383597281


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala:
##########
@@ -425,6 +425,110 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
     }
   }
 
+  /**
+   * Plans AS-OF joins using a dedicated sort-merge operator when the
+   * conf is enabled.
+   */
+  object AsOfJoinSelection extends Strategy with PredicateHelper {
+    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+      case j @ AsOfJoin(left, right, asOfCondition, condition, joinType,
+          orderExpression, _) if conf.sortMergeAsOfJoinEnabled =>
+        val (leftKeys, rightKeys, residual) = condition match {
+          case Some(cond) => extractEquiJoinKeys(cond, left, right)
+          case None => (Seq.empty[Expression], Seq.empty[Expression], None)
+        }
+        val (leftAsOf, rightAsOf) = extractAsOfExprs(
+          asOfCondition, orderExpression, left, right)
+
+        joins.SortMergeAsOfJoinExec(
+          leftKeys, rightKeys, leftAsOf, rightAsOf,
+          asOfCondition, orderExpression, joinType, residual,
+          planLater(left), planLater(right)) :: Nil
+      case _ => Nil
+    }
+
+    /**
+     * Extract equi-join key pairs and residual (non-equi) condition
+     * from a conjunction. Only EqualTo is treated as equi-key;
+     * EqualNullSafe is excluded because the Scanner uses
+     * ClusteredDistribution which co-partitions by hash -- null keys
+     * hash to the same partition but the Scanner's BaseOrdering treats
+     * nulls as equal, which would incorrectly match null-keyed rows.

Review Comment:
   Late catch on my side (this comment predates round 4): the rationale here is 
inverted. Since the null-key fix, the scanner *skips* left rows with null 
equi-keys (`SortMergeAsOfJoinExec.scala:248`), so the "would incorrectly match 
null-keyed rows" scenario can no longer happen -- and for `EqualNullSafe`, 
matching null keys would be the *correct* semantics. The actual reason to 
exclude `<=>` is the inverse: the scanner enforces `EqualTo` null semantics, 
which would wrongly drop the null-key matches `<=>` requires; routing it to the 
residual condition keeps it correct. Also, `ClusteredDistribution` is declared 
by the exec node's `requiredChildDistribution`, not the Scanner.
   ```suggestion
        * EqualNullSafe is excluded because the scanner implements EqualTo
        * null semantics (left rows with any null equi-key never match and
        * are skipped or null-padded), whereas EqualNullSafe requires null
        * keys to match. Such predicates fall through to the residual
        * condition, which evaluates them per candidate pair.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to