sunchao commented on code in PR #55885:
URL: https://github.com/apache/spark/pull/55885#discussion_r3455877816


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpression.scala:
##########
@@ -92,24 +126,94 @@ case class TransformExpression(
    */
   def reducers(other: TransformExpression): Option[Reducer[_, _]] = {
     (function, other.function) match {
-      case(e1: ReducibleFunction[_, _], e2: ReducibleFunction[_, _]) =>
-        reducer(e1, numBucketsOpt, e2, other.numBucketsOpt)
+      case (e1: ReducibleFunction[_, _], e2: ReducibleFunction[_, _]) =>
+        reducer(e1, this, e2, other)
       case _ => None
     }
   }
 
-  // Return a Reducer for a reducible function on another reducible function
+  /**
+   * Extract all literal parameters of this transform as V2 [[V2Literal]]s, 
preserving each value's
+   * internal representation and its `DataType`. Connectors interpret the 
value via the accompanying
+   * `DataType` rather than relying on a pre-converted JVM type.
+   *
+   * Examples:
+   *   bucket(4, col)        => [Literal(4, IntegerType)]
+   *   truncate(col, 3)      => [Literal(3, IntegerType)]
+   *   days(col)             => []  (no literals)
+   */
+  private def extractParameters: Array[V2Literal[_]] =
+    literalChildren.map(l => LiteralValue(l.value, l.dataType): 
V2Literal[_]).toArray
+
+  /**
+   * Reducer precondition: same argument layout/structure as `other` (arity, 
aligned slots, equal
+   * nested transforms, column refs elsewhere). Only literal *values* may 
differ. Unlike
+   * [[isSameFunction]] the function name is not compared.
+   */
+  private def sameArgumentLayout(other: TransformExpression): Boolean =

Review Comment:
   [P2] This equal-arity precondition prevents the generalized reducer from 
handling valid mixed-arity transform pairs. For example, both `raw(x)` and 
`mod(x, 2)` pass `KeyedPartitioning.supportsExpressions` because each has one 
direct column reference, and a connector can validly implement a reducer for 
`[]` versus `[2]`; however, `childrenMatch` rejects their `[column]` versus 
`[column, literal]` children before either reducer is called. This reintroduces 
the parameterized-vs-zero-parameter gap discussed earlier, where we agreed to 
leave compatibility to the connector, and forces an unnecessary shuffle. Please 
preserve the positional slot-safety check without globally requiring equal 
child counts, and add a zero-vs-one-parameter regression test.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpression.scala:
##########
@@ -92,24 +126,94 @@ case class TransformExpression(
    */
   def reducers(other: TransformExpression): Option[Reducer[_, _]] = {
     (function, other.function) match {
-      case(e1: ReducibleFunction[_, _], e2: ReducibleFunction[_, _]) =>
-        reducer(e1, numBucketsOpt, e2, other.numBucketsOpt)
+      case (e1: ReducibleFunction[_, _], e2: ReducibleFunction[_, _]) =>
+        reducer(e1, this, e2, other)
       case _ => None
     }
   }
 
-  // Return a Reducer for a reducible function on another reducible function
+  /**
+   * Extract all literal parameters of this transform as V2 [[V2Literal]]s, 
preserving each value's
+   * internal representation and its `DataType`. Connectors interpret the 
value via the accompanying
+   * `DataType` rather than relying on a pre-converted JVM type.
+   *
+   * Examples:
+   *   bucket(4, col)        => [Literal(4, IntegerType)]
+   *   truncate(col, 3)      => [Literal(3, IntegerType)]
+   *   days(col)             => []  (no literals)
+   */
+  private def extractParameters: Array[V2Literal[_]] =
+    literalChildren.map(l => LiteralValue(l.value, l.dataType): 
V2Literal[_]).toArray
+
+  /**
+   * Reducer precondition: same argument layout/structure as `other` (arity, 
aligned slots, equal
+   * nested transforms, column refs elsewhere). Only literal *values* may 
differ. Unlike
+   * [[isSameFunction]] the function name is not compared.
+   */
+  private def sameArgumentLayout(other: TransformExpression): Boolean =
+    childrenMatch(other)((_, _) => true)
+
+  /**
+   * Whether every literal parameter is a scalar (an [[AtomicType]]). Reducer 
parameters are scalar
+   * literals; this never forwards a complex Catalyst container (ArrayData / 
MapData / InternalRow)
+   * across the public reducer boundary -- such a transform is simply treated 
as not reducible.
+   */
+  private def scalarLiteralParams: Boolean =
+    literalChildren.forall(_.dataType.isInstanceOf[AtomicType])

Review Comment:
   [P2] `AtomicType` is stricter than the documented non-complex scalar 
contract. `CalendarIntervalType` is explicitly non-complex in Spark, although 
it does not extend `AtomicType`, and `Expressions.literal(new 
CalendarInterval(...))` creates a valid self-describing V2 literal. A connector 
whose transform returns a comparable key type can therefore pass 
`KeyedPartitioning.supportsExpressions`, yet differing interval parameters are 
rejected here before its generalized reducer can reconcile them, forcing an 
unnecessary shuffle. The public reducer Javadoc excludes array/map/struct 
parameters and says the new overload supports parameters of any type. Please 
reject the specified container types rather than all non-`AtomicType` values, 
and add a positive `CalendarIntervalType` reducer test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to