sunchao commented on code in PR #55885:
URL: https://github.com/apache/spark/pull/55885#discussion_r3245892738


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -552,7 +552,9 @@ object KeyedPartitioning {
 
   def supportsExpressions(expressions: Seq[Expression]): Boolean = {
     def isSupportedTransform(transform: TransformExpression): Boolean = {
-      transform.children.size == 1 && isReference(transform.children.head)
+      // TransformExpression.collectLeaves() only returns column references, 
not literals.
+      // We need exactly one column reference per transform.
+      transform.collectLeaves().size == 1

Review Comment:
   [P1] This widens support from transforms with one direct reference child to 
any transform whose `collectLeaves()` returns one leaf. Because `Transform` 
arguments can themselves be nested `Transform`s, `V2ExpressionUtils` can 
materialize shapes like `outer(years(k))` and `outer(days(k))`. The new gate 
accepts both, `keyPositions` later maps them only by leaf `k`, and 
`TransformExpression.isSameFunction` compares only the outer function name plus 
literals. That can make storage partitionings with different nested child 
semantics look compatible and let SPJ skip a required shuffle, which can drop 
join matches. Keep the old direct-reference constraint, or compare the full 
non-literal child semantics before admitting these transforms.
   
   _\[ :robot: posted by Codex on behalf of sunchao :robot: \]_



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpression.scala:
##########
@@ -92,22 +144,47 @@ case class TransformExpression(
    */
   def reducers(other: TransformExpression): Option[Reducer[_, _]] = {
     (function, other.function) match {
-      case(e1: ReducibleFunction[_, _], e2: ReducibleFunction[_, _]) =>
-        reducer(e1, numBucketsOpt, e2, other.numBucketsOpt)
+      case (e1: ReducibleFunction[_, _], e2: ReducibleFunction[_, _]) =>
+        reducer(e1, this, e2, other)
       case _ => None
     }
   }
 
-  // Return a Reducer for a reducible function on another reducible function
+  /**
+   * Extract all literal parameters from a transform expression.
+   * Returns ReducibleParameters containing the literal values in order.
+   *
+   * Examples:
+   *   bucket(4, col)        => ReducibleParameters([4])
+   *   truncate(col, 3)      => ReducibleParameters([3])
+   *   days(col)             => ReducibleParameters([])  (no literals)
+   */
+  private def extractParameters(expr: TransformExpression): 
ReducibleParameters = {
+    import scala.jdk.CollectionConverters._
+    val values = expr.literalChildren.map {
+      case Literal(value, _) => value.asInstanceOf[AnyRef]

Review Comment:
   [P1] `extractParameters` forwards raw Catalyst `Literal.value` objects into 
`ReducibleParameters`. For string literals, Spark stores `UTF8String`, while 
the new public API documents string parameters and exposes `getString()` as a 
`java.lang.String` cast. A connector implementing the documented 
string-parameter case will get `ClassCastException` from `getString(0)` or be 
forced to depend on Spark internals. Convert literal values to connector-facing 
external values by `dataType` before constructing `ReducibleParameters`.
   
   _\[ :robot: posted by Codex on behalf of sunchao :robot: \]_



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpression.scala:
##########
@@ -92,22 +144,47 @@ case class TransformExpression(
    */
   def reducers(other: TransformExpression): Option[Reducer[_, _]] = {
     (function, other.function) match {
-      case(e1: ReducibleFunction[_, _], e2: ReducibleFunction[_, _]) =>
-        reducer(e1, numBucketsOpt, e2, other.numBucketsOpt)
+      case (e1: ReducibleFunction[_, _], e2: ReducibleFunction[_, _]) =>
+        reducer(e1, this, e2, other)
       case _ => None
     }
   }
 
-  // Return a Reducer for a reducible function on another reducible function
+  /**
+   * Extract all literal parameters from a transform expression.
+   * Returns ReducibleParameters containing the literal values in order.
+   *
+   * Examples:
+   *   bucket(4, col)        => ReducibleParameters([4])
+   *   truncate(col, 3)      => ReducibleParameters([3])
+   *   days(col)             => ReducibleParameters([])  (no literals)
+   */
+  private def extractParameters(expr: TransformExpression): 
ReducibleParameters = {
+    import scala.jdk.CollectionConverters._
+    val values = expr.literalChildren.map {
+      case Literal(value, _) => value.asInstanceOf[AnyRef]
+    }
+    new ReducibleParameters(values.asJava)
+  }
+
+  /**
+   * Return a Reducer for a reducible function on another reducible function
+   * Handles both parameterized (bucket, truncate) and non-parameterized 
(days, hours) functions.
+   */
   private def reducer(
       thisFunction: ReducibleFunction[_, _],
-      thisNumBucketsOpt: Option[Int],
+      thisExpr: TransformExpression,
       otherFunction: ReducibleFunction[_, _],
-      otherNumBucketsOpt: Option[Int]): Option[Reducer[_, _]] = {
-    val res = (thisNumBucketsOpt, otherNumBucketsOpt) match {
-      case (Some(numBuckets), Some(otherNumBuckets)) =>
-        thisFunction.reducer(numBuckets, otherFunction, otherNumBuckets)
-      case _ => thisFunction.reducer(otherFunction)
+      otherExpr: TransformExpression): Option[Reducer[_, _]] = {
+    val thisParams = extractParameters(thisExpr)
+    val otherParams = extractParameters(otherExpr)
+
+    val res = if (!thisParams.isEmpty && !otherParams.isEmpty) {

Review Comment:
   [P2] The new generalized reducer API accepts `ReducibleParameters` on both 
sides, and this file even models zero-literal transforms as 
`ReducibleParameters([])`. But this dispatch only invokes the generalized 
overload when both sides are non-empty. Mixed cases such as 
parameterized-vs-zero-parameter transforms instead fall back to 
`reducer(otherFunction)`, so connectors that correctly implement the new 
generalized overload for those cases are never invoked; the default legacy path 
can even throw `UnsupportedOperationException`. If mixed arity is meant to be 
unsupported, the new API/docs should say that explicitly. Otherwise this should 
dispatch through the generalized overload whenever either side wants that path.
   
   _\[ :robot: posted by Codex on behalf of sunchao :robot: \]_



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to