sunchao commented on code in PR #55885:
URL: https://github.com/apache/spark/pull/55885#discussion_r3377019609


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpression.scala:
##########
@@ -92,24 +119,76 @@ case class TransformExpression(
    */
   def reducers(other: TransformExpression): Option[Reducer[_, _]] = {
     (function, other.function) match {
-      case(e1: ReducibleFunction[_, _], e2: ReducibleFunction[_, _]) =>
-        reducer(e1, numBucketsOpt, e2, other.numBucketsOpt)
+      case (e1: ReducibleFunction[_, _], e2: ReducibleFunction[_, _]) =>
+        reducer(e1, this, e2, other)
       case _ => None
     }
   }
 
-  // Return a Reducer for a reducible function on another reducible function
+  /**
+   * Extract all literal parameters from a transform expression.
+   * Returns ReducibleParameters containing the literal values in order.
+   *
+   * Examples:
+   *   bucket(4, col)        => ReducibleParameters([4])
+   *   truncate(col, 3)      => ReducibleParameters([3])
+   *   days(col)             => ReducibleParameters([])  (no literals)
+   */
+  private def extractParameters(expr: TransformExpression): 
ReducibleParameters = {
+    import scala.jdk.CollectionConverters._
+    val values = expr.literalChildren.map {
+      case Literal(value, dt) => CatalystTypeConverters.convertToScala(value, 
dt)
+    }
+    new ReducibleParameters(values.asJava)
+  }
+
+  /**
+   * Return a Reducer for a reducible function on another reducible function
+   * Handles both parameterized (bucket, truncate) and non-parameterized 
(days, hours) functions.
+   */
   private def reducer(
       thisFunction: ReducibleFunction[_, _],
-      thisNumBucketsOpt: Option[Int],
+      thisExpr: TransformExpression,
       otherFunction: ReducibleFunction[_, _],
-      otherNumBucketsOpt: Option[Int]): Option[Reducer[_, _]] = {
-    val res = (thisNumBucketsOpt, otherNumBucketsOpt) match {
-      case (Some(numBuckets), Some(otherNumBuckets)) =>
-        thisFunction.reducer(numBuckets, otherFunction, otherNumBuckets)
-      case _ => thisFunction.reducer(otherFunction)
+      otherExpr: TransformExpression): Option[Reducer[_, _]] = {
+    val thisParams = extractParameters(thisExpr)

Review Comment:
   [P1] Please preserve the column/literal argument layout before invoking a 
reducer. The strict `supportsExpressions` gate only requires one direct column 
reference; it does not require that reference to occupy the same child position 
on both sides. `extractParameters` then removes the literal positions, and the 
latest commit removed the positional `nonLiteralChildrenSame` guard.
   
   This is reachable for a public V2 function with two same-typed arguments. 
For example, `f(id, 2)` and `f(4, store_id)` both pass the gate. The 
generalized reducer sees only `[2]` and `[4]` and can report compatibility. I 
reproduced this on the current head with the integer-truncate reducer: both 
transforms were admitted, `isCompatible` returned true, and matching join value 
`3` mapped to reduced key `0` on the left versus key `3` on the right. SPJ can 
therefore skip a required shuffle and lose the match.
   
   Please restore a positional-shape check before reducer dispatch: literal 
values may differ, but literal and column-reference slots must align on both 
sides.
   
   _Posted by Codex on behalf of @sunchao._



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpression.scala:
##########
@@ -92,24 +119,76 @@ case class TransformExpression(
    */
   def reducers(other: TransformExpression): Option[Reducer[_, _]] = {
     (function, other.function) match {
-      case(e1: ReducibleFunction[_, _], e2: ReducibleFunction[_, _]) =>
-        reducer(e1, numBucketsOpt, e2, other.numBucketsOpt)
+      case (e1: ReducibleFunction[_, _], e2: ReducibleFunction[_, _]) =>
+        reducer(e1, this, e2, other)
       case _ => None
     }
   }
 
-  // Return a Reducer for a reducible function on another reducible function
+  /**
+   * Extract all literal parameters from a transform expression.
+   * Returns ReducibleParameters containing the literal values in order.
+   *
+   * Examples:
+   *   bucket(4, col)        => ReducibleParameters([4])
+   *   truncate(col, 3)      => ReducibleParameters([3])
+   *   days(col)             => ReducibleParameters([])  (no literals)
+   */
+  private def extractParameters(expr: TransformExpression): 
ReducibleParameters = {
+    import scala.jdk.CollectionConverters._
+    val values = expr.literalChildren.map {
+      case Literal(value, dt) => CatalystTypeConverters.convertToScala(value, 
dt)
+    }
+    new ReducibleParameters(values.asJava)
+  }
+
+  /**
+   * Return a Reducer for a reducible function on another reducible function
+   * Handles both parameterized (bucket, truncate) and non-parameterized 
(days, hours) functions.
+   */
   private def reducer(
       thisFunction: ReducibleFunction[_, _],
-      thisNumBucketsOpt: Option[Int],
+      thisExpr: TransformExpression,
       otherFunction: ReducibleFunction[_, _],
-      otherNumBucketsOpt: Option[Int]): Option[Reducer[_, _]] = {
-    val res = (thisNumBucketsOpt, otherNumBucketsOpt) match {
-      case (Some(numBuckets), Some(otherNumBuckets)) =>
-        thisFunction.reducer(numBuckets, otherFunction, otherNumBuckets)
-      case _ => thisFunction.reducer(otherFunction)
+      otherExpr: TransformExpression): Option[Reducer[_, _]] = {
+    val thisParams = extractParameters(thisExpr)
+    val otherParams = extractParameters(otherExpr)
+    val thisName = thisExpr.function.canonicalName()
+
+    def isSingleInt(p: ReducibleParameters): Boolean = {
+      p.count() == 1 && p.get(0).isInstanceOf[Int]
+    }
+
+    // Both thrown exceptions and `null` returns collapse to None; any failure
+    // to compute a reducer falls back to a shuffle (no SPJ).
+    def tryReduce[R](call: => R): Try[Option[R]] = {
+      val attempt = Try(Option(call))
+      attempt.failed.foreach {
+        case e: UnsupportedOperationException =>
+          logWarning(log"V2 function ${MDC(FUNCTION_NAME, thisName)} threw " +
+            log"UnsupportedOperationException; treating as not reducible. 
Override " +
+            log"reducer(ReducibleParameters, ReducibleFunction, 
ReducibleParameters) " +
+            log"to enable SPJ.")
+        case _ =>
+      }
+
+      attempt
     }
-    Option(res)
+
+    val res: Try[Option[Reducer[_, _]]] =
+      if (thisParams.isEmpty && otherParams.isEmpty) {
+        tryReduce(thisFunction.reducer(otherFunction))
+      } else if (isSingleInt(thisParams) && isSingleInt(otherParams)) {
+        // Try deprecated int-API first for legacy connectors (e.g. Iceberg 
1.10);
+        // the first attempt is silent because we have a fallback. Only the 
fallback warns.
+        Try(Option(thisFunction.reducer(

Review Comment:
   [P2] `Try(Option(oldReducer)).orElse(genericReducer)` only evaluates the 
generalized overload when the deprecated call throws. A documented `null` 
return becomes `Success(None)`, so `Try.orElse` returns it without executing 
the fallback.
   
   I reproduced this on the current head with a function implementing both 
overloads: the deprecated overload returned `null`, the generalized overload 
returned a valid reducer, and `reducers(...).isDefined` was still false. A 
connector retaining the deprecated method while implementing broader 
compatibility through the replacement API will therefore miss a valid reducer 
and add an unnecessary shuffle.
   
   Please inspect the inner `Option` and invoke the generalized overload after 
either a failed deprecated call or `None`. A regression test should cover 
legacy-null/new-success.
   
   _Posted by Codex on behalf of @sunchao._



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to