dtenedor commented on code in PR #55629:
URL: https://github.com/apache/spark/pull/55629#discussion_r3183366692


##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -5313,6 +5313,49 @@
     ],
     "sqlState" : "0A000"
   },
+  "NEAREST_BY_JOIN" : {
+    "message" : [
+      "Invalid nearest-by join."
+    ],
+    "subClass" : {
+      "EXACT_WITH_NONDETERMINISTIC_EXPRESSION" : {
+        "message" : [
+          "EXACT nearest-by join is incompatible with the nondeterministic 
ranking expression <expression>. Use APPROX, or replace the expression with a 
deterministic one."
+        ]
+      },
+      "NON_ORDERABLE_RANKING_EXPRESSION" : {
+        "message" : [
+          "The ranking expression <expression> of type <type> is not 
orderable."
+        ]
+      },
+      "NUM_RESULTS_OUT_OF_RANGE" : {
+        "message" : [
+          "The number of results <numResults> must be between <min> and <max>."

Review Comment:
   I imagine this error would surface after the query has executed for a while. 
What should the user do in this case? Is there a specific way they should amend 
their query to make it work the next time? Can we mention this?



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -5313,6 +5313,49 @@
     ],
     "sqlState" : "0A000"
   },
+  "NEAREST_BY_JOIN" : {
+    "message" : [
+      "Invalid nearest-by join."
+    ],
+    "subClass" : {
+      "EXACT_WITH_NONDETERMINISTIC_EXPRESSION" : {
+        "message" : [
+          "EXACT nearest-by join is incompatible with the nondeterministic 
ranking expression <expression>. Use APPROX, or replace the expression with a 
deterministic one."
+        ]
+      },
+      "NON_ORDERABLE_RANKING_EXPRESSION" : {
+        "message" : [
+          "The ranking expression <expression> of type <type> is not 
orderable."

Review Comment:
   Can we suggest one or two alternatives that would satisfy this constraint, 
to help the user move forward quickly?



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -7837,6 +7880,11 @@
           "Referencing a lateral column alias <lca> in window expression 
<windowExpr>."
         ]
       },
+      "LATERAL_JOIN_NEAREST_BY" : {
+        "message" : [
+          "LATERAL correlation with NEAREST BY clause."

Review Comment:
   This covers explicit LATERAL JOIN. Do we care about lateral column alias 
usage for queries over the results of the nearest-neighbor join as well, or is 
that orthogonal?



##########
docs/sql-ref-syntax-qry-select-join.md:
##########
@@ -53,6 +53,30 @@ relation { [ join_type ] JOIN [ LATERAL ] relation [ 
join_criteria ] | NATURAL j
 
     Specifies an expression with a return type of boolean.
 
+* **nearest_by_clause**
+
+    Specifies a nearest-by top-K ranking join. For each row on the left (query 
side), returns up to `num_results` rows from the right (base side), ranked by 
`ranking_expression`. Only `INNER` (the default) and `LEFT OUTER` join types 
are supported with this clause.
+
+    **Syntax:** `{ APPROX | EXACT } NEAREST [ num_results ] BY { DISTANCE | 
SIMILARITY } ranking_expression`
+
+    `APPROX | EXACT`
+
+    Controls the search algorithm contract. `APPROX` allows the optimizer to 
use faster approximate strategies (such as indexed nearest-neighbor search when 
available). `EXACT` forces brute-force evaluation and requires 
`ranking_expression` to be deterministic.
+
+    `num_results`
+
+    A positive integer literal between 1 and 100000 that limits the number of 
matches per left row. Defaults to 1 when omitted.

Review Comment:
   Why this limit? Is it controlled by a config?



##########
sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala:
##########
@@ -203,6 +203,31 @@ private[sql] object QueryParsingErrors extends 
DataTypeErrorsBase {
       ctx)
   }
 
+  def nearestByJoinWithLateralUnsupportedError(ctx: ParserRuleContext): 
Throwable = {
+    new ParseException(
+      errorClass = "UNSUPPORTED_FEATURE.LATERAL_JOIN_NEAREST_BY",
+      messageParameters = Map.empty,
+      ctx)
+  }
+
+  def unsupportedNearestByJoinTypeError(ctx: ParserRuleContext, joinType: 
String): Throwable = {
+    new ParseException(
+      errorClass = "NEAREST_BY_JOIN.UNSUPPORTED_JOIN_TYPE",
+      messageParameters =
+        Map("joinType" -> toSQLStmt(joinType), "supported" -> "'INNER', 'LEFT 
OUTER'"),
+      ctx)

Review Comment:
   Can you move `NearestByJoinType` to sql/api?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -2362,39 +2362,68 @@ class AstBuilder extends DataTypeAstBuilder
         }
       }
 
-      // Resolve the join type and join condition
-      val (joinType, condition) = Option(ctx.joinCriteria) match {
-        case Some(c) if c.USING != null =>
-          if (ctx.LATERAL != null) {
-            throw 
QueryParsingErrors.lateralJoinWithUsingJoinUnsupportedError(ctx)
+      if (ctx.nearestByClause != null) {

Review Comment:
   This part of the parser is already non-trivially complex. Can we move the 
NEAREST BY join part into a separate helper method to help with code health?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -2362,39 +2362,68 @@ class AstBuilder extends DataTypeAstBuilder
         }
       }
 
-      // Resolve the join type and join condition
-      val (joinType, condition) = Option(ctx.joinCriteria) match {
-        case Some(c) if c.USING != null =>
-          if (ctx.LATERAL != null) {
-            throw 
QueryParsingErrors.lateralJoinWithUsingJoinUnsupportedError(ctx)
+      if (ctx.nearestByClause != null) {
+        if (ctx.LATERAL != null) {
+          throw 
QueryParsingErrors.nearestByJoinWithLateralUnsupportedError(ctx)
+        }
+        if (!Seq(Inner, LeftOuter).contains(baseJoinType)) {
+          throw QueryParsingErrors.unsupportedNearestByJoinTypeError(
+            ctx, baseJoinType.sql, NearestByJoinType.supportedDisplay)
+        }
+        val clause = ctx.nearestByClause
+        val approx = clause.APPROX != null
+        val numResults = Option(clause.num).map { n =>
+          // Guard against literals that overflow Long.
+          val value = try n.getText.toLong catch {

Review Comment:
   could this `.toLong` throw any other kinds of exceptions?



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -5313,6 +5313,49 @@
     ],
     "sqlState" : "0A000"
   },
+  "NEAREST_BY_JOIN" : {
+    "message" : [
+      "Invalid nearest-by join."
+    ],
+    "subClass" : {
+      "EXACT_WITH_NONDETERMINISTIC_EXPRESSION" : {
+        "message" : [
+          "EXACT nearest-by join is incompatible with the nondeterministic 
ranking expression <expression>. Use APPROX, or replace the expression with a 
deterministic one."
+        ]
+      },
+      "NON_ORDERABLE_RANKING_EXPRESSION" : {
+        "message" : [
+          "The ranking expression <expression> of type <type> is not 
orderable."
+        ]
+      },
+      "NUM_RESULTS_OUT_OF_RANGE" : {
+        "message" : [
+          "The number of results <numResults> must be between <min> and <max>."
+        ]
+      },
+      "STREAMING_NOT_SUPPORTED" : {
+        "message" : [
+          "nearest-by join is not supported with streaming 
DataFrames/Datasets."

Review Comment:
   ```suggestion
             "Nearest-by join is not supported with streaming 
DataFrames/Datasets."
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -2420,3 +2420,58 @@ object AsOfJoin {
     }
   }
 }
+
+object NearestByJoin {
+  /** Upper bound on `numResults`. Mirrors the K-overload limit of 
`MaxMinByK`. */
+  val MaxNumResults: Int = 100000
+}
+
+/**
+ * A logical plan for a nearest-by top-K ranking join. For each row on the 
left side it returns
+ * up to `numResults` rows from the right side ordered by `rankingExpression`:
+ *   - `NearestByDistance`: smallest values of `rankingExpression` first.
+ *   - `NearestBySimilarity`: largest values of `rankingExpression` first.
+ *
+ * The `approx` field records the user's APPROX/EXACT choice from the SPIP. 
Today both modes
+ * use the same brute-force rewrite. The flag is preserved on the logical plan 
so future
+ * indexed approximate-nearest-neighbor strategies can fire only when `approx 
= true`,
+ * leaving EXACT queries unaffected. See the SPIP linked from SPARK-56395.
+ */
+case class NearestByJoin(
+    left: LogicalPlan,
+    right: LogicalPlan,
+    joinType: JoinType,
+    approx: Boolean,
+    numResults: Int,

Review Comment:
   Can we have `@param` items explaining what each of these arguments 
represents?



##########
docs/sql-ref-syntax-qry-select-join.md:
##########
@@ -53,6 +53,30 @@ relation { [ join_type ] JOIN [ LATERAL ] relation [ 
join_criteria ] | NATURAL j
 
     Specifies an expression with a return type of boolean.
 
+* **nearest_by_clause**
+
+    Specifies a nearest-by top-K ranking join. For each row on the left (query 
side), returns up to `num_results` rows from the right (base side), ranked by 
`ranking_expression`. Only `INNER` (the default) and `LEFT OUTER` join types 
are supported with this clause.
+
+    **Syntax:** `{ APPROX | EXACT } NEAREST [ num_results ] BY { DISTANCE | 
SIMILARITY } ranking_expression`
+
+    `APPROX | EXACT`
+
+    Controls the search algorithm contract. `APPROX` allows the optimizer to 
use faster approximate strategies (such as indexed nearest-neighbor search when 
available). `EXACT` forces brute-force evaluation and requires 
`ranking_expression` to be deterministic.
+
+    `num_results`
+
+    A positive integer literal between 1 and 100000 that limits the number of 
matches per left row. Defaults to 1 when omitted.
+
+    `DISTANCE | SIMILARITY`
+
+    `DISTANCE` ranks rows by smallest value of `ranking_expression` first. 
`SIMILARITY` ranks rows by largest value first.
+
+    `ranking_expression`
+
+    A scalar expression that returns an orderable type.

Review Comment:
   Should we mention any requirements about the determinism of this expression, 
or the presence of possible side-effects (e.g. UDFs)?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -2541,14 +2542,36 @@ object CheckCartesianProducts extends Rule[LogicalPlan] 
with PredicateHelper {
     }
   }
 
-  def apply(plan: LogicalPlan): LogicalPlan =
+  def apply(plan: LogicalPlan): LogicalPlan = {
     if (conf.crossJoinEnabled) {
-      plan
-    } else plan.transformWithPruning(_.containsAnyPattern(INNER_LIKE_JOIN, 
OUTER_JOIN))  {
+      return plan
+    }
+
+    // Joins synthesized by `RewriteNearestByJoin` are an intentional, bounded 
cross-product
+    // wrapped by a `MaxMinByK` aggregate. Identify them by their unambiguous 
post-rewrite
+    // signature -- `Aggregate(_, exprs, Join(_, _, LeftOuter, None, _))` 
where `exprs`
+    // contains a `MaxMinByK` -- and skip them so user queries written as 
`NEAREST BY` are not
+    // rejected when `spark.sql.crossJoin.enabled = false`. We use structural 
detection rather
+    // than a `TreeNodeTag` because a tag set on the `Join` would be silently 
dropped by any
+    // intervening optimizer rule that constructs a fresh `Join` via the 
case-class
+    // constructor without calling `copyTagsFrom`.
+    val nearestByJoins: java.util.IdentityHashMap[Join, Unit] = {
+      val acc = new java.util.IdentityHashMap[Join, Unit]()
+      plan.foreach {

Review Comment:
   This descends arbitrarily deep into the rest of the query plan, including 
through other joins and union operators, but does not descend into subquery 
expression plans. Do we really want to collect this deep? Would the presence of 
a `MaxMinByK` below more levels of joins really change anything here, for 
example?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -2420,3 +2420,58 @@ object AsOfJoin {
     }
   }
 }
+
+object NearestByJoin {
+  /** Upper bound on `numResults`. Mirrors the K-overload limit of 
`MaxMinByK`. */
+  val MaxNumResults: Int = 100000
+}
+
+/**
+ * A logical plan for a nearest-by top-K ranking join. For each row on the 
left side it returns
+ * up to `numResults` rows from the right side ordered by `rankingExpression`:
+ *   - `NearestByDistance`: smallest values of `rankingExpression` first.
+ *   - `NearestBySimilarity`: largest values of `rankingExpression` first.
+ *
+ * The `approx` field records the user's APPROX/EXACT choice from the SPIP. 
Today both modes
+ * use the same brute-force rewrite. The flag is preserved on the logical plan 
so future
+ * indexed approximate-nearest-neighbor strategies can fire only when `approx 
= true`,
+ * leaving EXACT queries unaffected. See the SPIP linked from SPARK-56395.
+ */
+case class NearestByJoin(

Review Comment:
   This operator is not basic IMO; can we move this to a separate file for 
better code health?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -2420,3 +2420,58 @@ object AsOfJoin {
     }
   }
 }
+
+object NearestByJoin {
+  /** Upper bound on `numResults`. Mirrors the K-overload limit of 
`MaxMinByK`. */
+  val MaxNumResults: Int = 100000
+}
+
+/**
+ * A logical plan for a nearest-by top-K ranking join. For each row on the 
left side it returns
+ * up to `numResults` rows from the right side ordered by `rankingExpression`:
+ *   - `NearestByDistance`: smallest values of `rankingExpression` first.
+ *   - `NearestBySimilarity`: largest values of `rankingExpression` first.
+ *
+ * The `approx` field records the user's APPROX/EXACT choice from the SPIP. 
Today both modes
+ * use the same brute-force rewrite. The flag is preserved on the logical plan 
so future
+ * indexed approximate-nearest-neighbor strategies can fire only when `approx 
= true`,
+ * leaving EXACT queries unaffected. See the SPIP linked from SPARK-56395.
+ */
+case class NearestByJoin(
+    left: LogicalPlan,
+    right: LogicalPlan,
+    joinType: JoinType,
+    approx: Boolean,
+    numResults: Int,
+    rankingExpression: Expression,
+    direction: NearestByDirection)
+  extends BinaryNode with SupportsNonDeterministicExpression {
+
+  require(Seq(Inner, LeftOuter).contains(joinType),
+    s"Unsupported nearest-by join type $joinType")
+
+  // APPROX permits a nondeterministic ranking expression (per the SPIP); the 
rewrite

Review Comment:
   This comment is incomplete, and we shouldn't mention the SPIP here. Let's 
make this comment self-explanatory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to