sunchao commented on code in PR #56575:
URL: https://github.com/apache/spark/pull/56575#discussion_r3484671312


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -583,7 +583,9 @@ class Analyzer(
       Seq(ResolveUpdateEventTimeWatermarkColumn) ++
       extendedResolutionRules ++
       Seq(NameStreamingSources) : _*),
-    Batch("Remove TempResolvedColumn", Once, RemoveTempResolvedColumn),
+    Batch("Remove analysis-only markers", Once,
+      RemoveTempResolvedColumn,
+      RemoveInputTypeMarkers),

Review Comment:
   [P2] Remove input markers on the single-pass analyzer path
   
   This cleanup batch only runs in the fixed-point `Analyzer`. With 
`spark.sql.analyzer.singlePassResolver.enabled=true`, `right(...)` is resolved 
and its `ImplicitCastInput` succeeds, but `ResolverRunner` marks the plan 
analyzed without running `RemoveInputTypeMarkers`. `LowerDelegateExpression` 
then removes only the outer delegate, leaving the `Unevaluable` marker in the 
executable expression; for example, `SELECT right('abc', 1)` fails during 
evaluation/code generation. The base `Right` used no marker nodes and worked in 
this mode. Could we add equivalent cleanup to the single-pass path and an 
execution-level regression test?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4391,6 +4393,21 @@ object EliminateUnions extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Removes the analysis-only input-type markers ([[ImplicitCastInput]] / 
[[TypeCheckInput]]) that a
+ * [[DelegateFunction]] inserts to drive or check implicit cast. Once type 
coercion has run they have
+ * served their purpose, so we strip them at the end of analysis, leaving a 
clean `definition` in the
+ * [[DelegateExpression]]. Like [[RemoveTempResolvedColumn]], this just 
unwraps a marker to its
+ * child; it is not load-bearing -- a `DelegateExpression` is correct with or 
without the markers.
+ */
+object RemoveInputTypeMarkers extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+    plan.resolveExpressionsWithPruning(_.containsPattern(INPUT_TYPE_MARKER)) {
+      case marker: ImplicitCastInput => marker.child
+      case marker: TypeCheckInput => marker.child

Review Comment:
   [P2] Keep failed type checks visible to `CheckAnalysis`
   
   This unwraps every `TypeCheckInput`, including one whose type check failed. 
For example, `TypeCheckInput(Literal(1L), IntegerType)` is unresolved before 
this rule, but an identity-lowering `DelegateFunction` with `implicitCast = 
false` becomes fully resolved after the marker is replaced by its `Long` child, 
so analysis accepts the mismatched argument. The new unit test only calls 
`checkInputDataTypes()` before cleanup and therefore misses the end-to-end 
behavior. Could we unwrap only successfully resolved markers (leaving failures 
for `CheckAnalysis`) and add an analyzer-level regression?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DelegateExpression.scala:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.ExpressionBuilder
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.catalyst.trees.TreePattern.{DELEGATE_EXPRESSION, 
INPUT_TYPE_MARKER, TreePattern}
+import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, DataType}
+
+/**
+ * A transparent, named delegate over a `definition` expression -- a 
LOGICAL-phase construct.
+ *
+ * `DelegateExpression` lets a high-level function (e.g. `right(a, b)`) stay 
readable in the analyzed
+ * and optimized logical plan, and lets optimizer rules introduce such nodes 
(e.g.
+ * `multi_get_json_object`), without hand-written `eval`/`doGenCode`. Every 
behavior delegates to
+ * `definition`, a real child fully visible to the analyzer and optimizer.
+ *
+ * `name`/`inputs` are purely informational (EXPLAIN/SQL): nothing enforces 
that `definition` matches
+ * what they claim, so the wrapper is never exposed to physical planning or 
external systems.
+ * `LowerDelegateExpression` strips it to `definition` in 
`QueryExecution.createSparkPlan` -- the
+ * single entry point to the planner, used by both the main query and AQE 
re-planning -- so the
+ * planner and every physical consumer (join-key extraction, data source 
pushdown, columnar rules,
+ * codegen) sees the real executed expression. (Data source V2 pushdown runs 
earlier, in the logical
+ * optimizer, so it unfolds the wrapper directly in `V2ExpressionBuilder`.) 
The wrapper survives the
+ * logical optimizer, so the optimized plan stays readable and optimizer rules 
can introduce these
+ * nodes; `eval`/`doGenCode` still delegate, as a safety net if a delegate 
ever reaches execution.
+ *
+ * Note: because the strip runs before planning, a `DelegateExpression` 
created by a *physical* rule
+ * (after `createSparkPlan`) is not stripped and may reach an external system 
un-lowered. That is
+ * acceptable -- like any other expression the system does not recognize, it 
simply falls back, and
+ * `eval`/`doGenCode` keep it correct within Spark. Analysis- and 
optimizer-inserted nodes (the
+ * common case) are always stripped, so physical-rule insertion is the only 
uncovered path.
+ */
+case class DelegateExpression(
+    name: String,
+    inputs: Seq[Expression],
+    definition: Expression)
+  extends Expression with UnaryLike[Expression] {
+
+  override def child: Expression = definition
+  override def dataType: DataType = definition.dataType
+  override def nullable: Boolean = definition.nullable
+  override def foldable: Boolean = definition.foldable
+  override lazy val deterministic: Boolean = definition.deterministic
+  override lazy val canonicalized: Expression = definition.canonicalized
+
+  override def eval(input: InternalRow): Any = definition.eval(input)
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode =
+    definition.genCode(ctx)
+
+  final override val nodePatterns: Seq[TreePattern] = Seq(DELEGATE_EXPRESSION)
+  override protected def withNewChildInternal(newChild: Expression): 
DelegateExpression =
+    copy(definition = newChild)
+
+  override def prettyName: String = name
+  override def sql: String = s"$name(${inputs.map(_.sql).mkString(", ")})"
+  override def toString: String = s"$name(${inputs.mkString(", ")})"
+}
+
+/**
+ * Analysis-only marker that requests an implicit cast of `child` to 
`expectedType`: it declares the
+ * expected type so the standard `TypeCoercion` rule casts the child, then is 
removed at the end of
+ * analysis by 
[[org.apache.spark.sql.catalyst.analysis.RemoveInputTypeMarkers]]. It never 
reaches
+ * execution, hence [[Unevaluable]]. Modeled on 
[[org.apache.spark.sql.catalyst.analysis.TempResolvedColumn]].
+ */
+case class ImplicitCastInput(child: Expression, expectedType: AbstractDataType)
+  extends UnaryExpression with Unevaluable with ImplicitCastInputTypes {
+  override def inputTypes: Seq[AbstractDataType] = Seq(expectedType)
+  override def dataType: DataType = child.dataType
+  override def nullable: Boolean = child.nullable
+  override lazy val canonicalized: Expression = child.canonicalized
+  final override val nodePatterns: Seq[TreePattern] = Seq(INPUT_TYPE_MARKER)
+  override protected def withNewChildInternal(newChild: Expression): 
ImplicitCastInput =
+    copy(child = newChild)
+}
+
+/**
+ * Analysis-only marker that requires `child` to already match `expectedType` 
(no cast is inserted),
+ * failing analysis otherwise. Removed at the end of analysis like 
[[ImplicitCastInput]].
+ */
+case class TypeCheckInput(child: Expression, expectedType: AbstractDataType)
+  extends UnaryExpression with Unevaluable with ExpectsInputTypes {
+  override def inputTypes: Seq[AbstractDataType] = Seq(expectedType)
+  override def dataType: DataType = child.dataType
+  override def nullable: Boolean = child.nullable
+  override lazy val canonicalized: Expression = child.canonicalized
+  final override val nodePatterns: Seq[TreePattern] = Seq(INPUT_TYPE_MARKER)
+  override protected def withNewChildInternal(newChild: Expression): 
TypeCheckInput =
+    copy(child = newChild)
+}
+
+/**
+ * The per-function object each built-in function defines (e.g. `object Right 
extends
+ * DelegateFunction`). It is just an [[ExpressionBuilder]] -- registered with 
the ordinary
+ * `expressionBuilder(...)`, with its `@ExpressionDescription` annotation read 
off the object as
+ * usual -- specialized for the delegate pattern: replace the 
`InheritAnalysisRules` ceremony with
+ * one `lower` method plus a couple of flags. `apply` is the 
direct-construction entry point.
+ *
+ * Input-type contract, covering all three cases (applied per argument):
+ *   - `inputTypes` empty (or `AnyDataType` for a position): accept any type 
(no check, no cast).
+ *   - `inputTypes` set, `implicitCast = true`  (default): implicit-cast each 
arg to its type.
+ *   - `inputTypes` set, `implicitCast = false`           : type-check each 
arg, no cast.
+ */
+trait DelegateFunction extends ExpressionBuilder {
+  def name: String
+  def inputTypes: Seq[AbstractDataType] = Nil
+  def implicitCast: Boolean = true
+
+  /** Lower the function into the expression it delegates to. */
+  def lower(args: Seq[Expression]): Expression
+
+  /**
+   * ExpressionBuilder contract: invoked by the registry during function 
resolution. ONLY this
+   * (analysis-time) path inserts the input-type markers, because the 
analyzer's `TypeCoercion`
+   * casts them and `RemoveInputTypeMarkers` strips them afterwards.
+   */
+  override def build(funcName: String, expressions: Seq[Expression]): 
Expression = {

Review Comment:
   [P2] Validate each delegate function's argument count
   
   `build` currently accepts any number of expressions. `Right.lower` consumes 
only positions 0 and 1, so `right('abcd', 1, 99)` succeeds and silently ignores 
`99`, while zero/one-argument calls fail by indexing the sequence instead of 
returning the structured `WRONG_NUM_ARGS` error. The previous 
`expression[Right]` registration enforced its two-expression constructor. Could 
`DelegateFunction` expose a `functionSignature`, or otherwise validate arity 
before calling `lower`, with wrong-arity SQL coverage?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala:
##########
@@ -2359,29 +2359,24 @@ case class Substring(str: Expression, pos: Expression, 
len: Expression)
   since = "2.3.0",
   group = "string_funcs")
 // scalastyle:on line.size.limit
-case class Right(str: Expression, len: Expression) extends RuntimeReplaceable
-  with ImplicitCastInputTypes with BinaryLike[Expression] {
-
-  override lazy val replacement: Expression = If(
-    IsNull(str),
-    Literal(null, str.dataType),
-    If(
-      LessThanOrEqual(len, Literal(0)),
-      Literal(UTF8String.EMPTY_UTF8, str.dataType),
-      new Substring(str, UnaryMinus(len, failOnError = false))
-    )
-  )
+object Right extends DelegateFunction {
+  override val name: String = "right"
 
   override def inputTypes: Seq[AbstractDataType] =
-    Seq(
-      StringTypeWithCollation(supportsTrimCollation = true),
-      IntegerType
-    )
-  override def left: Expression = str
-  override def right: Expression = len
-  override protected def withNewChildrenInternal(
-      newLeft: Expression, newRight: Expression): Expression = {
-    copy(str = newLeft, len = newRight)
+    Seq(StringTypeWithCollation(supportsTrimCollation = true), IntegerType)
+
+  // NOTE: runs at parse time on unresolved args, so it must not read an 
input's `.dataType`.
+  // The `If` branch types are unified later by type coercion.
+  override def lower(args: Seq[Expression]): Expression = {
+    val str = args(0)
+    val len = args(1)
+    If(
+      IsNull(str),
+      Literal(null, StringType),

Review Comment:
   [P2] Preserve the resolved input type for these branch literals
   
   Hard-coding plain `StringType` changes the result schema when the public 
`spark.sql.preserveCharVarcharTypeInfo` setting is enabled. For example, 
`typeof(right(CAST('abc' AS CHAR(5)), 2))` returned `char(5)` on the base 
implementation, whose null/empty literals used `str.dataType`, but returns 
`string` on this head because `If` widens the `CHAR(5)` substring and these 
`STRING` branches. Registry expression builders receive resolved arguments, and 
the marker's `dataType` delegates to its child, so using `str.dataType` here is 
safe. Could we preserve the old literal typing and add CHAR/VARCHAR result-type 
coverage?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/LowerDelegateExpression.scala:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.expressions.DelegateExpression
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.DELEGATE_EXPRESSION
+
+/**
+ * Strips every [[DelegateExpression]] down to its `definition`. Run on the 
optimized logical plan in
+ * [[QueryExecution.createSparkPlan]] -- the single entry point to the 
planner, used by both the main
+ * query and AQE re-planning -- so the planner and every physical consumer 
(join-key extraction,
+ * V1 / cached-batch pushdown, columnar rules, codegen) sees the real executed 
expression rather than
+ * the informational wrapper. Data source V2 pushdown runs earlier, in the 
logical optimizer, so it
+ * unfolds the wrapper separately in `V2ExpressionBuilder`. The wrapper 
remains in the optimized
+ * logical plan for EXPLAIN.
+ */
+object LowerDelegateExpression extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+    
plan.transformAllExpressionsWithPruning(_.containsPattern(DELEGATE_EXPRESSION)) 
{

Review Comment:
   [P2] Fully lower a delegate whose definition is another delegate
   
   `transformAllExpressionsWithPruning` is pre-order. If an outer delegate's 
definition is itself a `DelegateExpression`, replacing the outer node causes 
traversal to continue only through the replacement's children; the replacement 
root is not matched again. One pass therefore leaves the inner delegate in the 
plan, despite this rule's `strips every DelegateExpression` contract. A nested 
predicate delegate can consequently remain opaque to join-key extraction or 
physical pushdown. Could this use bottom-up/recursive lowering and include a 
nested-delegate regression?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala:
##########
@@ -94,6 +94,8 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean 
= false) extends L
   private def generateExpression(
       expr: Expression, isPredicate: Boolean = false): Option[V2Expression] = 
expr match {
     case literal: Literal => Some(translateLiteral(literal))
+    // DelegateExpression is a Spark-internal wrapper; push down its real 
definition instead.
+    case d: DelegateExpression => generateExpression(d.definition, isPredicate)

Review Comment:
   [P2] Preserve reconstruction mappings for compound delegate predicates
   
   `translateFilterV2WithMapping` treats the wrapper as its leaf case, but this 
new recursion can return a structural `V2And`/`V2Or`. The map then contains 
only `compoundPredicate -> originalDelegate`. If a `SupportsPushDownV2Filters` 
source returns that predicate unchanged because it cannot fully push it, 
`rebuildExpressionFromFilter` decomposes the compound node before consulting 
the map and fails on its synthetic, unmapped children with `Failed to rebuild 
Expression for filter`. Could we unfold before the mapping-aware recursion, or 
prefer an exact map entry before structural descent, and add a rejected-filter 
round-trip test?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -814,7 +814,11 @@ case class AdaptiveSparkPlanExec(
     try {
       logicalPlan.invalidateStatsCache()
       val optimized = optimizer.execute(logicalPlan)
-      val sparkPlan = 
context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next()
+      // Go through QueryExecution.createSparkPlan -- the single place that 
strips DelegateExpression
+      // before planning -- instead of calling the planner directly, so the 
re-planned stage sees the
+      // real executed expression.
+      val sparkPlan = QueryExecution.createSparkPlan(

Review Comment:
   [P2] Return the same logical tree that AQE used for planning
   
   When a runtime optimizer rule inserts a `DelegateExpression` and the 
resulting physical plan is adopted, `createSparkPlan` plans a lowered copy 
whose nodes become the physical plan's `logicalLink` targets, while this method 
returns the original unlowered `optimized` tree below. If a copied node backs a 
later query stage, `replaceWithQueryStagesInLogicalPlan` cannot find it using 
reference equality, so the completed stage is not inserted into 
`currentLogicalPlan` and subsequent runtime optimization loses that stage's 
statistics. This is conditional on the new plan being adopted, not every 
runtime-added wrapper. Could AQE retain and return the exact lowered tree used 
for planning, with an injected runtime-optimizer/staged-query regression?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to