cloud-fan commented on code in PR #56575:
URL: https://github.com/apache/spark/pull/56575#discussion_r3498010517
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4391,6 +4393,21 @@ object EliminateUnions extends Rule[LogicalPlan] {
}
}
+/**
+ * Removes the analysis-only input-type markers ([[ImplicitCastInput]] /
[[TypeCheckInput]]) that a
+ * [[DelegateFunction]] inserts to drive or check implicit cast. Once type
coercion has run they have
+ * served their purpose, so we strip them at the end of analysis, leaving a
clean `definition` in the
+ * [[DelegateExpression]]. Like [[RemoveTempResolvedColumn]], this just
unwraps a marker to its
+ * child; it is not load-bearing -- a `DelegateExpression` is correct with or
without the markers.
+ */
+object RemoveInputTypeMarkers extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan =
+ plan.resolveExpressionsWithPruning(_.containsPattern(INPUT_TYPE_MARKER)) {
+ case marker: ImplicitCastInput => marker.child
+ case marker: TypeCheckInput => marker.child
Review Comment:
Fixed in `bffecc826d6`. `RemoveInputTypeMarkers` (and its expression-level
`removeMarkers`) now only unwrap a marker `if marker.resolved`, so a
`TypeCheckInput` whose type check failed is left in place and its
`ExpectsInputTypes` failure stays visible to `CheckAnalysis` instead of
exposing a resolved child of the wrong type. Added a regression in
`DelegateExpressionSuite` ("RemoveInputTypeMarkers keeps a failed type-check
marker for CheckAnalysis to report").
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DelegateExpression.scala:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.ExpressionBuilder
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext,
ExprCode}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.trees.TreePattern.{DELEGATE_EXPRESSION,
INPUT_TYPE_MARKER, TreePattern}
+import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, DataType}
+
+/**
+ * A transparent, named delegate over a `definition` expression -- a
LOGICAL-phase construct.
+ *
+ * `DelegateExpression` lets a high-level function (e.g. `right(a, b)`) stay
readable in the analyzed
+ * and optimized logical plan, and lets optimizer rules introduce such nodes
(e.g.
+ * `multi_get_json_object`), without hand-written `eval`/`doGenCode`. Every
behavior delegates to
+ * `definition`, a real child fully visible to the analyzer and optimizer.
+ *
+ * `name`/`inputs` are purely informational (EXPLAIN/SQL): nothing enforces
that `definition` matches
+ * what they claim, so the wrapper is never exposed to physical planning or
external systems.
+ * `LowerDelegateExpression` strips it to `definition` in
`QueryExecution.createSparkPlan` -- the
+ * single entry point to the planner, used by both the main query and AQE
re-planning -- so the
+ * planner and every physical consumer (join-key extraction, data source
pushdown, columnar rules,
+ * codegen) sees the real executed expression. (Data source V2 pushdown runs
earlier, in the logical
+ * optimizer, so it unfolds the wrapper directly in `V2ExpressionBuilder`.)
The wrapper survives the
+ * logical optimizer, so the optimized plan stays readable and optimizer rules
can introduce these
+ * nodes; `eval`/`doGenCode` still delegate, as a safety net if a delegate
ever reaches execution.
+ *
+ * Note: because the strip runs before planning, a `DelegateExpression`
created by a *physical* rule
+ * (after `createSparkPlan`) is not stripped and may reach an external system
un-lowered. That is
+ * acceptable -- like any other expression the system does not recognize, it
simply falls back, and
+ * `eval`/`doGenCode` keep it correct within Spark. Analysis- and
optimizer-inserted nodes (the
+ * common case) are always stripped, so physical-rule insertion is the only
uncovered path.
+ */
+case class DelegateExpression(
+ name: String,
+ inputs: Seq[Expression],
+ definition: Expression)
+ extends Expression with UnaryLike[Expression] {
+
+ override def child: Expression = definition
+ override def dataType: DataType = definition.dataType
+ override def nullable: Boolean = definition.nullable
+ override def foldable: Boolean = definition.foldable
+ override lazy val deterministic: Boolean = definition.deterministic
+ override lazy val canonicalized: Expression = definition.canonicalized
+
+ override def eval(input: InternalRow): Any = definition.eval(input)
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode):
ExprCode =
+ definition.genCode(ctx)
+
+ final override val nodePatterns: Seq[TreePattern] = Seq(DELEGATE_EXPRESSION)
+ override protected def withNewChildInternal(newChild: Expression):
DelegateExpression =
+ copy(definition = newChild)
+
+ override def prettyName: String = name
+ override def sql: String = s"$name(${inputs.map(_.sql).mkString(", ")})"
+ override def toString: String = s"$name(${inputs.mkString(", ")})"
+}
+
+/**
+ * Analysis-only marker that requests an implicit cast of `child` to
`expectedType`: it declares the
+ * expected type so the standard `TypeCoercion` rule casts the child, then is
removed at the end of
+ * analysis by
[[org.apache.spark.sql.catalyst.analysis.RemoveInputTypeMarkers]]. It never
reaches
+ * execution, hence [[Unevaluable]]. Modeled on
[[org.apache.spark.sql.catalyst.analysis.TempResolvedColumn]].
+ */
+case class ImplicitCastInput(child: Expression, expectedType: AbstractDataType)
+ extends UnaryExpression with Unevaluable with ImplicitCastInputTypes {
+ override def inputTypes: Seq[AbstractDataType] = Seq(expectedType)
+ override def dataType: DataType = child.dataType
+ override def nullable: Boolean = child.nullable
+ override lazy val canonicalized: Expression = child.canonicalized
+ final override val nodePatterns: Seq[TreePattern] = Seq(INPUT_TYPE_MARKER)
+ override protected def withNewChildInternal(newChild: Expression):
ImplicitCastInput =
+ copy(child = newChild)
+}
+
+/**
+ * Analysis-only marker that requires `child` to already match `expectedType`
(no cast is inserted),
+ * failing analysis otherwise. Removed at the end of analysis like
[[ImplicitCastInput]].
+ */
+case class TypeCheckInput(child: Expression, expectedType: AbstractDataType)
+ extends UnaryExpression with Unevaluable with ExpectsInputTypes {
+ override def inputTypes: Seq[AbstractDataType] = Seq(expectedType)
+ override def dataType: DataType = child.dataType
+ override def nullable: Boolean = child.nullable
+ override lazy val canonicalized: Expression = child.canonicalized
+ final override val nodePatterns: Seq[TreePattern] = Seq(INPUT_TYPE_MARKER)
+ override protected def withNewChildInternal(newChild: Expression):
TypeCheckInput =
+ copy(child = newChild)
+}
+
+/**
+ * The per-function object each built-in function defines (e.g. `object Right
extends
+ * DelegateFunction`). It is just an [[ExpressionBuilder]] -- registered with
the ordinary
+ * `expressionBuilder(...)`, with its `@ExpressionDescription` annotation read
off the object as
+ * usual -- specialized for the delegate pattern: replace the
`InheritAnalysisRules` ceremony with
+ * one `lower` method plus a couple of flags. `apply` is the
direct-construction entry point.
+ *
+ * Input-type contract, covering all three cases (applied per argument):
+ * - `inputTypes` empty (or `AnyDataType` for a position): accept any type
(no check, no cast).
+ * - `inputTypes` set, `implicitCast = true` (default): implicit-cast each
arg to its type.
+ * - `inputTypes` set, `implicitCast = false` : type-check each
arg, no cast.
+ */
+trait DelegateFunction extends ExpressionBuilder {
+ def name: String
+ def inputTypes: Seq[AbstractDataType] = Nil
+ def implicitCast: Boolean = true
+
+ /** Lower the function into the expression it delegates to. */
+ def lower(args: Seq[Expression]): Expression
+
+ /**
+ * ExpressionBuilder contract: invoked by the registry during function
resolution. ONLY this
+ * (analysis-time) path inserts the input-type markers, because the
analyzer's `TypeCoercion`
+ * casts them and `RemoveInputTypeMarkers` strips them afterwards.
+ */
+ override def build(funcName: String, expressions: Seq[Expression]):
Expression = {
Review Comment:
Fixed in `bffecc826d6`. `DelegateFunction.build` now validates
`expressions.length == inputTypes.length` and throws the structured
`WRONG_NUM_ARGS` error before calling `lower`, so `right('abcd', 1, 99)` no
longer silently ignores the extra arg and zero/one-arg calls no longer index
past the sequence. An empty `inputTypes` marks a variadic function whose
`lower` owns arg handling (no arity check). Added wrong-arity SQL coverage in
`DelegateExpressionQuerySuite` plus a unit test in `DelegateExpressionSuite`.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/LowerDelegateExpression.scala:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.expressions.DelegateExpression
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.DELEGATE_EXPRESSION
+
+/**
+ * Strips every [[DelegateExpression]] down to its `definition`. Run on the
optimized logical plan in
+ * [[QueryExecution.createSparkPlan]] -- the single entry point to the
planner, used by both the main
+ * query and AQE re-planning -- so the planner and every physical consumer
(join-key extraction,
+ * V1 / cached-batch pushdown, columnar rules, codegen) sees the real executed
expression rather than
+ * the informational wrapper. Data source V2 pushdown runs earlier, in the
logical optimizer, so it
+ * unfolds the wrapper separately in `V2ExpressionBuilder`. The wrapper
remains in the optimized
+ * logical plan for EXPLAIN.
+ */
+object LowerDelegateExpression extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan =
+
plan.transformAllExpressionsWithPruning(_.containsPattern(DELEGATE_EXPRESSION))
{
Review Comment:
Fixed in `58889e8d7aa`. `LowerDelegateExpression.lower` is now a
tail-recursive unwrap: if a delegate's `definition` is itself a
`DelegateExpression`, the chain is collapsed in one go, so no wrapper survives
the pre-order `transformAllExpressionsWithPruning`. Delegates nested deeper (as
children of `definition`) are still handled by the surrounding tree traversal.
Added a nested-delegate regression ("LowerDelegateExpression fully unwraps a
directly-nested delegate-of-delegate").
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala:
##########
@@ -94,6 +94,8 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean
= false) extends L
private def generateExpression(
expr: Expression, isPredicate: Boolean = false): Option[V2Expression] =
expr match {
case literal: Literal => Some(translateLiteral(literal))
+ // DelegateExpression is a Spark-internal wrapper; push down its real
definition instead.
+ case d: DelegateExpression => generateExpression(d.definition, isPredicate)
Review Comment:
Fixed in `bffecc826d6`, with the "prefer an exact map entry before
structural descent" option. `rebuildExpressionFromFilter` now checks
`translatedFilterToExpr.get(predicate)` before matching `V2And`/`V2Or`/`V2Not`,
so a compound-`definition` delegate that translated to a single mapped
`V2And`/`V2Or` is restored directly instead of recursing into its synthetic,
unmapped children (which threw `Failed to rebuild Expression for filter`). This
is granularity-correct at every level, so a delegate nested inside an ordinary
compound is covered too; normal compound filters are decomposed at translation
(only leaves mapped), so the exact lookup misses and behavior is unchanged.
Added a rejected-filter round-trip regression in `DataSourceV2StrategySuite`.
(Same root cause as the pre-redesign V2 finding, now applied to
`DelegateExpression`.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]