sunchao commented on code in PR #55885:
URL: https://github.com/apache/spark/pull/55885#discussion_r3455877816
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpression.scala:
##########
@@ -92,24 +126,94 @@ case class TransformExpression(
*/
def reducers(other: TransformExpression): Option[Reducer[_, _]] = {
(function, other.function) match {
- case(e1: ReducibleFunction[_, _], e2: ReducibleFunction[_, _]) =>
- reducer(e1, numBucketsOpt, e2, other.numBucketsOpt)
+ case (e1: ReducibleFunction[_, _], e2: ReducibleFunction[_, _]) =>
+ reducer(e1, this, e2, other)
case _ => None
}
}
- // Return a Reducer for a reducible function on another reducible function
+ /**
+ * Extract all literal parameters of this transform as V2 [[V2Literal]]s,
preserving each value's
+ * internal representation and its `DataType`. Connectors interpret the
value via the accompanying
+ * `DataType` rather than relying on a pre-converted JVM type.
+ *
+ * Examples:
+ * bucket(4, col) => [Literal(4, IntegerType)]
+ * truncate(col, 3) => [Literal(3, IntegerType)]
+ * days(col) => [] (no literals)
+ */
+ private def extractParameters: Array[V2Literal[_]] =
+ literalChildren.map(l => LiteralValue(l.value, l.dataType):
V2Literal[_]).toArray
+
+ /**
+ * Reducer precondition: same argument layout/structure as `other` (arity,
aligned slots, equal
+ * nested transforms, column refs elsewhere). Only literal *values* may
differ. Unlike
+ * [[isSameFunction]] the function name is not compared.
+ */
+ private def sameArgumentLayout(other: TransformExpression): Boolean =
Review Comment:
[P2] This equal-arity precondition prevents the generalized reducer from
handling valid mixed-arity transform pairs. For example, both `raw(x)` and
`mod(x, 2)` pass `KeyedPartitioning.supportsExpressions` because each has one
direct column reference, and a connector can validly implement a reducer for
`[]` versus `[2]`; however, `childrenMatch` rejects their `[column]` versus
`[column, literal]` children before either reducer is called. This reintroduces
the parameterized-vs-zero-parameter gap discussed earlier, where we agreed to
leave compatibility to the connector, and forces an unnecessary shuffle. Please
preserve the positional slot-safety check without globally requiring equal
child counts, and add a zero-vs-one-parameter regression test.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpression.scala:
##########
@@ -92,24 +126,94 @@ case class TransformExpression(
*/
def reducers(other: TransformExpression): Option[Reducer[_, _]] = {
(function, other.function) match {
- case(e1: ReducibleFunction[_, _], e2: ReducibleFunction[_, _]) =>
- reducer(e1, numBucketsOpt, e2, other.numBucketsOpt)
+ case (e1: ReducibleFunction[_, _], e2: ReducibleFunction[_, _]) =>
+ reducer(e1, this, e2, other)
case _ => None
}
}
- // Return a Reducer for a reducible function on another reducible function
+ /**
+ * Extract all literal parameters of this transform as V2 [[V2Literal]]s,
preserving each value's
+ * internal representation and its `DataType`. Connectors interpret the
value via the accompanying
+ * `DataType` rather than relying on a pre-converted JVM type.
+ *
+ * Examples:
+ * bucket(4, col) => [Literal(4, IntegerType)]
+ * truncate(col, 3) => [Literal(3, IntegerType)]
+ * days(col) => [] (no literals)
+ */
+ private def extractParameters: Array[V2Literal[_]] =
+ literalChildren.map(l => LiteralValue(l.value, l.dataType):
V2Literal[_]).toArray
+
+ /**
+ * Reducer precondition: same argument layout/structure as `other` (arity,
aligned slots, equal
+ * nested transforms, column refs elsewhere). Only literal *values* may
differ. Unlike
+ * [[isSameFunction]] the function name is not compared.
+ */
+ private def sameArgumentLayout(other: TransformExpression): Boolean =
+ childrenMatch(other)((_, _) => true)
+
+ /**
+ * Whether every literal parameter is a scalar (an [[AtomicType]]). Reducer
parameters are scalar
+ * literals; this never forwards a complex Catalyst container (ArrayData /
MapData / InternalRow)
+ * across the public reducer boundary -- such a transform is simply treated
as not reducible.
+ */
+ private def scalarLiteralParams: Boolean =
+ literalChildren.forall(_.dataType.isInstanceOf[AtomicType])
Review Comment:
[P2] `AtomicType` is stricter than the documented non-complex scalar
contract. `CalendarIntervalType` is explicitly non-complex in Spark, although
it does not extend `AtomicType`, and `Expressions.literal(new
CalendarInterval(...))` creates a valid self-describing V2 literal. A connector
whose transform returns a comparable key type can therefore pass
`KeyedPartitioning.supportsExpressions`, yet differing interval parameters are
rejected here before its generalized reducer can reconcile them, forcing an
unnecessary shuffle. The public reducer Javadoc excludes array/map/struct
parameters and says the new overload supports parameters of any type. Please
reject the specified container types rather than all non-`AtomicType` values,
and add a positive `CalendarIntervalType` reducer test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]