sunchao commented on code in PR #56575:
URL: https://github.com/apache/spark/pull/56575#discussion_r3484671312
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -583,7 +583,9 @@ class Analyzer(
Seq(ResolveUpdateEventTimeWatermarkColumn) ++
extendedResolutionRules ++
Seq(NameStreamingSources) : _*),
- Batch("Remove TempResolvedColumn", Once, RemoveTempResolvedColumn),
+ Batch("Remove analysis-only markers", Once,
+ RemoveTempResolvedColumn,
+ RemoveInputTypeMarkers),
Review Comment:
[P2] Remove input markers on the single-pass analyzer path
This cleanup batch only runs in the fixed-point `Analyzer`. With
`spark.sql.analyzer.singlePassResolver.enabled=true`, `right(...)` is resolved
and its `ImplicitCastInput` succeeds, but `ResolverRunner` marks the plan
analyzed without running `RemoveInputTypeMarkers`. `LowerDelegateExpression`
then removes only the outer delegate, leaving the `Unevaluable` marker in the
executable expression; for example, `SELECT right('abc', 1)` fails during
evaluation/code generation. The base `Right` used no marker nodes and worked in
this mode. Could we add equivalent cleanup to the single-pass path and an
execution-level regression test?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4391,6 +4393,21 @@ object EliminateUnions extends Rule[LogicalPlan] {
}
}
+/**
+ * Removes the analysis-only input-type markers ([[ImplicitCastInput]] /
[[TypeCheckInput]]) that a
+ * [[DelegateFunction]] inserts to drive or check implicit cast. Once type
coercion has run they have
+ * served their purpose, so we strip them at the end of analysis, leaving a
clean `definition` in the
+ * [[DelegateExpression]]. Like [[RemoveTempResolvedColumn]], this just
unwraps a marker to its
+ * child; it is not load-bearing -- a `DelegateExpression` is correct with or
without the markers.
+ */
+object RemoveInputTypeMarkers extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan =
+ plan.resolveExpressionsWithPruning(_.containsPattern(INPUT_TYPE_MARKER)) {
+ case marker: ImplicitCastInput => marker.child
+ case marker: TypeCheckInput => marker.child
Review Comment:
[P2] Keep failed type checks visible to `CheckAnalysis`
This unwraps every `TypeCheckInput`, including one whose type check failed.
For example, `TypeCheckInput(Literal(1L), IntegerType)` is unresolved before
this rule, but an identity-lowering `DelegateFunction` with `implicitCast =
false` becomes fully resolved after the marker is replaced by its `Long` child,
so analysis accepts the mismatched argument. The new unit test only calls
`checkInputDataTypes()` before cleanup and therefore misses the end-to-end
behavior. Could we unwrap only successfully resolved markers (leaving failures
for `CheckAnalysis`) and add an analyzer-level regression?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DelegateExpression.scala:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.ExpressionBuilder
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext,
ExprCode}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.trees.TreePattern.{DELEGATE_EXPRESSION,
INPUT_TYPE_MARKER, TreePattern}
+import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, DataType}
+
+/**
+ * A transparent, named delegate over a `definition` expression -- a
LOGICAL-phase construct.
+ *
+ * `DelegateExpression` lets a high-level function (e.g. `right(a, b)`) stay
readable in the analyzed
+ * and optimized logical plan, and lets optimizer rules introduce such nodes
(e.g.
+ * `multi_get_json_object`), without hand-written `eval`/`doGenCode`. Every
behavior delegates to
+ * `definition`, a real child fully visible to the analyzer and optimizer.
+ *
+ * `name`/`inputs` are purely informational (EXPLAIN/SQL): nothing enforces
that `definition` matches
+ * what they claim, so the wrapper is never exposed to physical planning or
external systems.
+ * `LowerDelegateExpression` strips it to `definition` in
`QueryExecution.createSparkPlan` -- the
+ * single entry point to the planner, used by both the main query and AQE
re-planning -- so the
+ * planner and every physical consumer (join-key extraction, data source
pushdown, columnar rules,
+ * codegen) sees the real executed expression. (Data source V2 pushdown runs
earlier, in the logical
+ * optimizer, so it unfolds the wrapper directly in `V2ExpressionBuilder`.)
The wrapper survives the
+ * logical optimizer, so the optimized plan stays readable and optimizer rules
can introduce these
+ * nodes; `eval`/`doGenCode` still delegate, as a safety net if a delegate
ever reaches execution.
+ *
+ * Note: because the strip runs before planning, a `DelegateExpression`
created by a *physical* rule
+ * (after `createSparkPlan`) is not stripped and may reach an external system
un-lowered. That is
+ * acceptable -- like any other expression the system does not recognize, it
simply falls back, and
+ * `eval`/`doGenCode` keep it correct within Spark. Analysis- and
optimizer-inserted nodes (the
+ * common case) are always stripped, so physical-rule insertion is the only
uncovered path.
+ */
+case class DelegateExpression(
+ name: String,
+ inputs: Seq[Expression],
+ definition: Expression)
+ extends Expression with UnaryLike[Expression] {
+
+ override def child: Expression = definition
+ override def dataType: DataType = definition.dataType
+ override def nullable: Boolean = definition.nullable
+ override def foldable: Boolean = definition.foldable
+ override lazy val deterministic: Boolean = definition.deterministic
+ override lazy val canonicalized: Expression = definition.canonicalized
+
+ override def eval(input: InternalRow): Any = definition.eval(input)
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode):
ExprCode =
+ definition.genCode(ctx)
+
+ final override val nodePatterns: Seq[TreePattern] = Seq(DELEGATE_EXPRESSION)
+ override protected def withNewChildInternal(newChild: Expression):
DelegateExpression =
+ copy(definition = newChild)
+
+ override def prettyName: String = name
+ override def sql: String = s"$name(${inputs.map(_.sql).mkString(", ")})"
+ override def toString: String = s"$name(${inputs.mkString(", ")})"
+}
+
+/**
+ * Analysis-only marker that requests an implicit cast of `child` to
`expectedType`: it declares the
+ * expected type so the standard `TypeCoercion` rule casts the child, then is
removed at the end of
+ * analysis by
[[org.apache.spark.sql.catalyst.analysis.RemoveInputTypeMarkers]]. It never
reaches
+ * execution, hence [[Unevaluable]]. Modeled on
[[org.apache.spark.sql.catalyst.analysis.TempResolvedColumn]].
+ */
+case class ImplicitCastInput(child: Expression, expectedType: AbstractDataType)
+ extends UnaryExpression with Unevaluable with ImplicitCastInputTypes {
+ override def inputTypes: Seq[AbstractDataType] = Seq(expectedType)
+ override def dataType: DataType = child.dataType
+ override def nullable: Boolean = child.nullable
+ override lazy val canonicalized: Expression = child.canonicalized
+ final override val nodePatterns: Seq[TreePattern] = Seq(INPUT_TYPE_MARKER)
+ override protected def withNewChildInternal(newChild: Expression):
ImplicitCastInput =
+ copy(child = newChild)
+}
+
+/**
+ * Analysis-only marker that requires `child` to already match `expectedType`
(no cast is inserted),
+ * failing analysis otherwise. Removed at the end of analysis like
[[ImplicitCastInput]].
+ */
+case class TypeCheckInput(child: Expression, expectedType: AbstractDataType)
+ extends UnaryExpression with Unevaluable with ExpectsInputTypes {
+ override def inputTypes: Seq[AbstractDataType] = Seq(expectedType)
+ override def dataType: DataType = child.dataType
+ override def nullable: Boolean = child.nullable
+ override lazy val canonicalized: Expression = child.canonicalized
+ final override val nodePatterns: Seq[TreePattern] = Seq(INPUT_TYPE_MARKER)
+ override protected def withNewChildInternal(newChild: Expression):
TypeCheckInput =
+ copy(child = newChild)
+}
+
+/**
+ * The per-function object each built-in function defines (e.g. `object Right
extends
+ * DelegateFunction`). It is just an [[ExpressionBuilder]] -- registered with
the ordinary
+ * `expressionBuilder(...)`, with its `@ExpressionDescription` annotation read
off the object as
+ * usual -- specialized for the delegate pattern: replace the
`InheritAnalysisRules` ceremony with
+ * one `lower` method plus a couple of flags. `apply` is the
direct-construction entry point.
+ *
+ * Input-type contract, covering all three cases (applied per argument):
+ * - `inputTypes` empty (or `AnyDataType` for a position): accept any type
(no check, no cast).
+ * - `inputTypes` set, `implicitCast = true` (default): implicit-cast each
arg to its type.
+ * - `inputTypes` set, `implicitCast = false` : type-check each
arg, no cast.
+ */
+trait DelegateFunction extends ExpressionBuilder {
+ def name: String
+ def inputTypes: Seq[AbstractDataType] = Nil
+ def implicitCast: Boolean = true
+
+ /** Lower the function into the expression it delegates to. */
+ def lower(args: Seq[Expression]): Expression
+
+ /**
+ * ExpressionBuilder contract: invoked by the registry during function
resolution. ONLY this
+ * (analysis-time) path inserts the input-type markers, because the
analyzer's `TypeCoercion`
+ * casts them and `RemoveInputTypeMarkers` strips them afterwards.
+ */
+ override def build(funcName: String, expressions: Seq[Expression]):
Expression = {
Review Comment:
[P2] Validate each delegate function's argument count
`build` currently accepts any number of expressions. `Right.lower` consumes
only positions 0 and 1, so `right('abcd', 1, 99)` succeeds and silently ignores
`99`, while zero/one-argument calls fail by indexing the sequence instead of
returning the structured `WRONG_NUM_ARGS` error. The previous
`expression[Right]` registration enforced its two-expression constructor. Could
`DelegateFunction` expose a `functionSignature`, or otherwise validate arity
before calling `lower`, with wrong-arity SQL coverage?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala:
##########
@@ -2359,29 +2359,24 @@ case class Substring(str: Expression, pos: Expression,
len: Expression)
since = "2.3.0",
group = "string_funcs")
// scalastyle:on line.size.limit
-case class Right(str: Expression, len: Expression) extends RuntimeReplaceable
- with ImplicitCastInputTypes with BinaryLike[Expression] {
-
- override lazy val replacement: Expression = If(
- IsNull(str),
- Literal(null, str.dataType),
- If(
- LessThanOrEqual(len, Literal(0)),
- Literal(UTF8String.EMPTY_UTF8, str.dataType),
- new Substring(str, UnaryMinus(len, failOnError = false))
- )
- )
+object Right extends DelegateFunction {
+ override val name: String = "right"
override def inputTypes: Seq[AbstractDataType] =
- Seq(
- StringTypeWithCollation(supportsTrimCollation = true),
- IntegerType
- )
- override def left: Expression = str
- override def right: Expression = len
- override protected def withNewChildrenInternal(
- newLeft: Expression, newRight: Expression): Expression = {
- copy(str = newLeft, len = newRight)
+ Seq(StringTypeWithCollation(supportsTrimCollation = true), IntegerType)
+
+ // NOTE: runs at parse time on unresolved args, so it must not read an
input's `.dataType`.
+ // The `If` branch types are unified later by type coercion.
+ override def lower(args: Seq[Expression]): Expression = {
+ val str = args(0)
+ val len = args(1)
+ If(
+ IsNull(str),
+ Literal(null, StringType),
Review Comment:
[P2] Preserve the resolved input type for these branch literals
Hard-coding plain `StringType` changes the result schema when the public
`spark.sql.preserveCharVarcharTypeInfo` setting is enabled. For example,
`typeof(right(CAST('abc' AS CHAR(5)), 2))` returned `char(5)` on the base
implementation, whose null/empty literals used `str.dataType`, but returns
`string` on this head because `If` widens the `CHAR(5)` substring and these
`STRING` branches. Registry expression builders receive resolved arguments, and
the marker's `dataType` delegates to its child, so using `str.dataType` here is
safe. Could we preserve the old literal typing and add CHAR/VARCHAR result-type
coverage?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/LowerDelegateExpression.scala:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.expressions.DelegateExpression
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.DELEGATE_EXPRESSION
+
+/**
+ * Strips every [[DelegateExpression]] down to its `definition`. Run on the
optimized logical plan in
+ * [[QueryExecution.createSparkPlan]] -- the single entry point to the
planner, used by both the main
+ * query and AQE re-planning -- so the planner and every physical consumer
(join-key extraction,
+ * V1 / cached-batch pushdown, columnar rules, codegen) sees the real executed
expression rather than
+ * the informational wrapper. Data source V2 pushdown runs earlier, in the
logical optimizer, so it
+ * unfolds the wrapper separately in `V2ExpressionBuilder`. The wrapper
remains in the optimized
+ * logical plan for EXPLAIN.
+ */
+object LowerDelegateExpression extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan =
+
plan.transformAllExpressionsWithPruning(_.containsPattern(DELEGATE_EXPRESSION))
{
Review Comment:
[P2] Fully lower a delegate whose definition is another delegate
`transformAllExpressionsWithPruning` is pre-order. If an outer delegate's
definition is itself a `DelegateExpression`, replacing the outer node causes
traversal to continue only through the replacement's children; the replacement
root is not matched again. One pass therefore leaves the inner delegate in the
plan, despite this rule's `strips every DelegateExpression` contract. A nested
predicate delegate can consequently remain opaque to join-key extraction or
physical pushdown. Could this use bottom-up/recursive lowering and include a
nested-delegate regression?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala:
##########
@@ -94,6 +94,8 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean
= false) extends L
private def generateExpression(
expr: Expression, isPredicate: Boolean = false): Option[V2Expression] =
expr match {
case literal: Literal => Some(translateLiteral(literal))
+ // DelegateExpression is a Spark-internal wrapper; push down its real
definition instead.
+ case d: DelegateExpression => generateExpression(d.definition, isPredicate)
Review Comment:
[P2] Preserve reconstruction mappings for compound delegate predicates
`translateFilterV2WithMapping` treats the wrapper as its leaf case, but this
new recursion can return a structural `V2And`/`V2Or`. The map then contains
only `compoundPredicate -> originalDelegate`. If a `SupportsPushDownV2Filters`
source returns that predicate unchanged because it cannot fully push it,
`rebuildExpressionFromFilter` decomposes the compound node before consulting
the map and fails on its synthetic, unmapped children with `Failed to rebuild
Expression for filter`. Could we unfold before the mapping-aware recursion, or
prefer an exact map entry before structural descent, and add a rejected-filter
round-trip test?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -814,7 +814,11 @@ case class AdaptiveSparkPlanExec(
try {
logicalPlan.invalidateStatsCache()
val optimized = optimizer.execute(logicalPlan)
- val sparkPlan =
context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next()
+ // Go through QueryExecution.createSparkPlan -- the single place that
strips DelegateExpression
+ // before planning -- instead of calling the planner directly, so the
re-planned stage sees the
+ // real executed expression.
+ val sparkPlan = QueryExecution.createSparkPlan(
Review Comment:
[P2] Return the same logical tree that AQE used for planning
When a runtime optimizer rule inserts a `DelegateExpression` and the
resulting physical plan is adopted, `createSparkPlan` plans a lowered copy
whose nodes become the physical plan's `logicalLink` targets, while this method
returns the original unlowered `optimized` tree below. If a copied node backs a
later query stage, `replaceWithQueryStagesInLogicalPlan` cannot find it using
reference equality, so the completed stage is not inserted into
`currentLogicalPlan` and subsequent runtime optimization loses that stage's
statistics. This is conditional on the new plan being adopted, not every
runtime-added wrapper. Could AQE retain and return the exact lowered tree used
for planning, with an injected runtime-optimizer/staged-query regression?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]