szehon-ho commented on code in PR #55885:
URL: https://github.com/apache/spark/pull/55885#discussion_r3456389747


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpression.scala:
##########
@@ -92,24 +126,94 @@ case class TransformExpression(
    */
   def reducers(other: TransformExpression): Option[Reducer[_, _]] = {
     (function, other.function) match {
-      case(e1: ReducibleFunction[_, _], e2: ReducibleFunction[_, _]) =>
-        reducer(e1, numBucketsOpt, e2, other.numBucketsOpt)
+      case (e1: ReducibleFunction[_, _], e2: ReducibleFunction[_, _]) =>
+        reducer(e1, this, e2, other)
       case _ => None
     }
   }
 
-  // Return a Reducer for a reducible function on another reducible function
+  /**
+   * Extract all literal parameters of this transform as V2 [[V2Literal]]s, 
preserving each value's
+   * internal representation and its `DataType`. Connectors interpret the 
value via the accompanying
+   * `DataType` rather than relying on a pre-converted JVM type.
+   *
+   * Examples:
+   *   bucket(4, col)        => [Literal(4, IntegerType)]
+   *   truncate(col, 3)      => [Literal(3, IntegerType)]
+   *   days(col)             => []  (no literals)
+   */
+  private def extractParameters: Array[V2Literal[_]] =
+    literalChildren.map(l => LiteralValue(l.value, l.dataType): 
V2Literal[_]).toArray
+
+  /**
+   * Reducer precondition: same argument layout/structure as `other` (arity, 
aligned slots, equal
+   * nested transforms, column refs elsewhere). Only literal *values* may 
differ. Unlike
+   * [[isSameFunction]] the function name is not compared.
+   */
+  private def sameArgumentLayout(other: TransformExpression): Boolean =
+    childrenMatch(other)((_, _) => true)
+
+  /**
+   * Whether every literal parameter is a scalar (an [[AtomicType]]). Reducer 
parameters are scalar
+   * literals; this never forwards a complex Catalyst container (ArrayData / 
MapData / InternalRow)
+   * across the public reducer boundary -- such a transform is simply treated 
as not reducible.
+   */
+  private def scalarLiteralParams: Boolean =
+    literalChildren.forall(_.dataType.isInstanceOf[AtomicType])
+
+  /**
+   * Return a Reducer for a reducible function on another reducible function
+   * Handles both parameterized (bucket, truncate) and non-parameterized 
(days, hours) functions.
+   */
   private def reducer(
       thisFunction: ReducibleFunction[_, _],
-      thisNumBucketsOpt: Option[Int],
+      thisExpr: TransformExpression,
       otherFunction: ReducibleFunction[_, _],
-      otherNumBucketsOpt: Option[Int]): Option[Reducer[_, _]] = {
-    val res = (thisNumBucketsOpt, otherNumBucketsOpt) match {
-      case (Some(numBuckets), Some(otherNumBuckets)) =>
-        thisFunction.reducer(numBuckets, otherFunction, otherNumBuckets)
-      case _ => thisFunction.reducer(otherFunction)
+      otherExpr: TransformExpression): Option[Reducer[_, _]] = {
+    if (!thisExpr.sameArgumentLayout(otherExpr) ||
+        !thisExpr.scalarLiteralParams || !otherExpr.scalarLiteralParams) {
+      return None
+    }
+
+    val thisParams = thisExpr.extractParameters
+    val otherParams = otherExpr.extractParameters
+    val thisName = thisExpr.function.canonicalName()
+
+    // Gate on DataType, not the boxed runtime class 
(DateType/YearMonthInterval box to Int).
+    def isSingleInt(p: Array[V2Literal[_]]): Boolean = {
+      p.length == 1 && p(0).dataType == IntegerType
+    }
+
+    // Run a reducer overload; a thrown exception or a null both become None. 
warnOnUoe logs a hint
+    // when the function implements no usable reducer overload.
+    def attempt[R](call: => R, warnOnUoe: Boolean): Option[R] = {
+      val t = Try(Option(call))
+      if (warnOnUoe) {
+        t.failed.foreach {
+          case _: UnsupportedOperationException =>
+            logWarning(log"V2 function ${MDC(FUNCTION_NAME, thisName)} threw " 
+
+              log"UnsupportedOperationException; treating as not reducible. 
Override " +
+              log"reducer(Literal[], ReducibleFunction, Literal[]) to enable 
SPJ.")
+          case _ =>
+        }
+      }
+      t.toOption.flatten
+    }
+
+    if (thisParams.isEmpty && otherParams.isEmpty) {
+      attempt(thisFunction.reducer(otherFunction), warnOnUoe = true)
+    } else if (isSingleInt(thisParams) && isSingleInt(otherParams)) {
+      // Try the deprecated int API first (legacy connectors); fall back to 
the generalized overload
+      // when it is absent or returns null. Option.orElse fires on None, 
covering both.
+      attempt(thisFunction.reducer(
+          thisParams(0).value().asInstanceOf[Int], otherFunction,
+          otherParams(0).value().asInstanceOf[Int]), warnOnUoe = false)
+        .orElse(
+          attempt(thisFunction.reducer(thisParams, otherFunction, 
otherParams), warnOnUoe = true))
+    } else {
+      // Parameterized functions (bucket, truncate, etc.)
+      attempt(thisFunction.reducer(thisParams, otherFunction, otherParams), 
warnOnUoe = true)
     }

Review Comment:
   Optional cleanup (non-blocking): the `warnOnUoe` flag encodes a 
whole-function question -- "did *no* overload turn out to be implemented?" -- 
as a per-attempt flag. You can drop the flag by giving `attempt` a three-state 
result (`None` = not implemented / threw, `Some(None)` = implemented but not 
reducible, `Some(r)` = reducible) and deciding the warning once from the 
aggregate.
   
   As a bonus this fixes a subtle false-warning: today if a connector 
implements the deprecated API and *deliberately* returns `null` (not reducible) 
while not implementing the new API, the terminal attempt throws UOE and 
`warnOnUoe = true` fires -- even though the function is implemented and 
intentionally said "no". With the version below, that case is `Some(None)` and 
does not trigger the hint; the warning fires only when *every* overload threw.
   
   Note `Try(Option(call)).toOption` preserves the current behavior of treating 
*any* throwable as "not implemented" (same as the existing 
`t.toOption.flatten`). If you'd rather only treat 
`UnsupportedOperationException` that way and rethrow everything else, a 
`Try(...) match { case Success(r) => Some(r); case Failure(_: 
UnsupportedOperationException) => None; case Failure(e) => throw e }` form 
works too.
   
   ```suggestion
       // Probe a reducer overload, distinguishing three outcomes so the 
warning fires only when *no*
       // overload is implemented -- not on the deprecated-API probe, which 
throws UOE by design for
       // connectors that implement only the new Literal[] API:
       //   None       -> overload not implemented (threw, e.g. 
UnsupportedOperationException)
       //   Some(None) -> implemented, but not reducible for these params 
(returned null)
       //   Some(r)    -> implemented and reducible
       def attempt(call: => Reducer[_, _]): Option[Option[Reducer[_, _]]] =
         Try(Option(call)).toOption
   
       val attempts: Seq[Option[Option[Reducer[_, _]]]] =
         if (thisParams.isEmpty && otherParams.isEmpty) {
           Seq(attempt(thisFunction.reducer(otherFunction)))
         } else if (isSingleInt(thisParams) && isSingleInt(otherParams)) {
           // Try the deprecated int API first (legacy connectors), then the 
generalized overload.
           Seq(
             attempt(thisFunction.reducer(
               thisParams(0).value().asInstanceOf[Int], otherFunction,
               otherParams(0).value().asInstanceOf[Int])),
             attempt(thisFunction.reducer(thisParams, otherFunction, 
otherParams)))
         } else {
           // Parameterized functions (bucket, truncate, etc.)
           Seq(attempt(thisFunction.reducer(thisParams, otherFunction, 
otherParams)))
         }
   
       // First implemented-and-reducible overload wins. Warn only when every 
overload threw (i.e.
       // nothing is implemented); a deliberate null from an implemented 
overload is Some(None) and
       // does not trigger the hint.
       val result = attempts.flatten.flatten.headOption
       if (result.isEmpty && attempts.forall(_.isEmpty)) {
         logWarning(log"V2 function ${MDC(FUNCTION_NAME, thisName)} implements 
no reducer; " +
           log"treating as not reducible. Override " +
           log"reducer(Literal[], ReducibleFunction, Literal[]) to enable SPJ.")
       }
       result
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to