gengliangwang commented on code in PR #55949:
URL: https://github.com/apache/spark/pull/55949#discussion_r3262406447
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala:
##########
@@ -59,19 +59,31 @@ trait UnresolvedUnaryNode extends UnaryNode with
UnresolvedNode
/**
* A logical plan placeholder that holds the identifier clause string
expression. It will be
* replaced by the actual logical plan with the evaluated identifier string.
+ *
+ * Extends `NamedRelation` so it can occupy a `NamedRelation`-typed slot (e.g.
+ * `OverwriteByExpression.table`) directly at parse time, instead of wrapping
the whole command.
+ *
+ * The parser always places this node inside the command's identifier slot (a
child slot for
+ * DELETE/UPDATE/MERGE/CTAS/RTAS, or a non-child slot for
`InsertIntoStatement.table` and
+ * `OverwriteByExpression.table` -- handled via explicit cases in
`ResolveIdentifierClause` and
+ * `BindParameters`). It is never the substitution root of a `WITH ...
<command>` subtree, so
+ * `CTEInChildren` semantics are not needed: any surrounding `WithCTE`
produced by
+ * `CTESubstitution` targets the inner command directly.
*/
case class PlanWithUnresolvedIdentifier(
identifierExpr: Expression,
children: Seq[LogicalPlan],
planBuilder: (Seq[String], Seq[LogicalPlan]) => LogicalPlan)
- extends UnresolvedNode {
+ extends UnresolvedNode with NamedRelation {
def this(identifierExpr: Expression, planBuilder: Seq[String] =>
LogicalPlan) = {
this(identifierExpr, Nil, (ident, _) => planBuilder(ident))
}
final override val nodePatterns: Seq[TreePattern] =
Seq(PLAN_WITH_UNRESOLVED_IDENTIFIER)
+ override def name: String = identifierExpr.sql
Review Comment:
`PlanWithUnresolvedIdentifier` is now reachable as a `NamedRelation` via two
error paths I traced: `SparkStrategies.extractTableNameForError`
(`sql/core/.../SparkStrategies.scala:1135`) and the `r: NamedRelation =>
toSQLId(r.name)` fallback in `QueryCompilationErrors.scala:3635`. Both will
render the SQL text of the unresolved expression (e.g. `IDENTIFIER(:p)` or
`concat('a', 'b')`) as the "table name" in error messages. That's a reasonable
fallback, but worth a one-line comment on this override noting that
`identifierExpr.sql` is intentionally an unresolved-placeholder stand-in for
those error paths.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala:
##########
@@ -70,6 +71,37 @@ class ResolveIdentifierClause(earlyBatches:
Seq[RuleExecutor[LogicalPlan]#Batch]
executor.execute(p.planBuilder.apply(
IdentifierResolution.evalIdentifierExpr(p.identifierExpr),
p.children))
+ // `InsertIntoStatement.table` and `V2WriteCommand.table` are non-child
LogicalPlan slots
+ // (`child = query`), so the standard `resolveOperatorsUp` traversal
never visits
+ // placeholders inside them. Materialize them explicitly. Only
`InsertIntoStatement` and
+ // `OverwriteByExpression` carry a parse-time placeholder today, but
matching the
+ // `V2WriteCommand` trait keeps the rule consistent across the family.
+ case i: InsertIntoStatement if
i.table.isInstanceOf[PlanWithUnresolvedIdentifier] && {
+ val p = i.table.asInstanceOf[PlanWithUnresolvedIdentifier]
+ p.identifierExpr.resolved && p.childrenResolved
+ } =>
+ val p = i.table.asInstanceOf[PlanWithUnresolvedIdentifier]
+ if (referredTempVars.isDefined) {
+ referredTempVars.get ++= collectTemporaryVariablesInLogicalPlan(p)
+ }
+ i.copy(table = executor.execute(p.planBuilder.apply(
+ IdentifierResolution.evalIdentifierExpr(p.identifierExpr),
p.children)))
Review Comment:
The PR description shows this as `case i @ InsertIntoStatement(p:
PlanWithUnresolvedIdentifier, ...) ...`, but the code uses `isInstanceOf` in
the guard and `asInstanceOf` in both the guard and the body (the cast appears
twice). Either update the description, or extract the cast once — e.g. guard
with `i.table.isInstanceOf[PlanWithUnresolvedIdentifier]`, then do the
`identifierExpr.resolved && childrenResolved` check and the materialization in
the body, returning `i` unchanged when not yet resolved. The same applies to
the `V2WriteCommand` case below.
##########
sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala:
##########
@@ -2460,4 +2462,175 @@ class ParametersSuite extends SharedSparkSession {
spark.sql("SELECT 1", Array.empty[Any]),
Row(1))
}
+
+ // SPARK-46625: WITH ... <write-with-IDENTIFIER> SELECT ... FROM cte
+ // The placeholder is pushed into the command's identifier slot at parse
time, so
+ // `CTESubstitution` sees the `CTEInChildren` directly and never produces
the invalid
+ // `WithCTE(InsertIntoStatement, ...)` / `WithCTE(CreateTableAsSelect, ...)`
shape.
+ private def assertNoWithCTEAroundCTEInChildren(df: DataFrame): Unit = {
+ df.queryExecution.analyzed.foreach {
+ case WithCTE(_: CTEInChildren, _) =>
+ fail(s"Found invalid WithCTE(CTEInChildren, _)
shape:\n${df.queryExecution.analyzed}")
+ case _ =>
+ }
+ }
+
+ test("SPARK-46625: WITH ... INSERT OVERWRITE TABLE IDENTIFIER(:p) SELECT ...
FROM cte") {
+ withTable("t_cte_overwrite") {
+ sql("CREATE TABLE t_cte_overwrite (a INT) USING PARQUET")
+ sql("INSERT INTO t_cte_overwrite VALUES (10)")
+ val df = spark.sql(
+ """WITH transformation AS (SELECT 1 AS a)
+ |INSERT OVERWRITE TABLE IDENTIFIER(:tname)
+ |SELECT * FROM transformation""".stripMargin,
+ Map("tname" -> "t_cte_overwrite"))
+ assertNoWithCTEAroundCTEInChildren(df)
+ checkAnswer(spark.table("t_cte_overwrite"), Row(1))
+ }
+ }
+
+ test("SPARK-46625: WITH ... INSERT INTO IDENTIFIER(:p) SELECT ... FROM cte")
{
+ withTable("t_cte_into") {
+ sql("CREATE TABLE t_cte_into (a INT) USING PARQUET")
+ val df = spark.sql(
+ """WITH transformation AS (SELECT 7 AS a)
+ |INSERT INTO IDENTIFIER(:tname)
+ |SELECT * FROM transformation""".stripMargin,
+ Map("tname" -> "t_cte_into"))
+ assertNoWithCTEAroundCTEInChildren(df)
+ checkAnswer(spark.table("t_cte_into"), Row(7))
+ }
+ }
+
+ test("SPARK-46625: CREATE TABLE IDENTIFIER(:p) AS WITH ... SELECT ... FROM
cte") {
+ withTable("t_cte_ctas") {
+ val df = spark.sql(
+ """CREATE TABLE IDENTIFIER(:tname) USING PARQUET AS
+ |WITH transformation AS (SELECT 3 AS a)
+ |SELECT * FROM transformation""".stripMargin,
+ Map("tname" -> "t_cte_ctas"))
+ assertNoWithCTEAroundCTEInChildren(df)
+ checkAnswer(spark.table("t_cte_ctas"), Row(3))
+ }
+ }
+
+ // SPARK-46625: legacy parameter-substitution mode triggers the
parameters.scala traversal
+ // path. The placeholder lives in `InsertIntoStatement.table`, which is
*not* a child, so this
+ // exercises the `InsertIntoStatement` special-case in `BindParameters.bind`
that recurses into
+ // the `table` slot, and the `getDefaultTreePatternBits` override on
`InsertIntoStatement` that
+ // exposes `table`'s tree-pattern bits for pruning.
+ test("SPARK-46625: INSERT IDENTIFIER(:p) under legacy parameter
substitution") {
+ withSQLConf(SQLConf.LEGACY_PARAMETER_SUBSTITUTION_CONSTANTS_ONLY.key ->
"true") {
+ withTable("t_legacy_param") {
+ sql("CREATE TABLE t_legacy_param (a INT) USING PARQUET")
+ spark.sql(
+ """WITH transformation AS (SELECT 11 AS a)
+ |INSERT INTO IDENTIFIER(:tname)
+ |SELECT * FROM transformation""".stripMargin,
+ Map("tname" -> "t_legacy_param"))
+ checkAnswer(spark.table("t_legacy_param"), Row(11))
+ }
+ }
+ }
+
+ // SPARK-46625: INSERT INTO REPLACE WHERE goes through
`OverwriteByExpression`, whose `table`
+ // slot is typed `NamedRelation`. `PlanWithUnresolvedIdentifier` extends
`NamedRelation` so the
+ // placeholder sits in the slot directly. Verify on the parsed plan that the
placeholder lives
+ // in `OverwriteByExpression.table` rather than wrapping the whole command
-- running the
+ // analyzer fully would require a v2 catalog.
+ test("SPARK-46625: WITH ... INSERT INTO IDENTIFIER(:p) REPLACE WHERE ...
parser") {
+ // Use a non-literal-string expression so `withIdentClause` produces
+ // `PlanWithUnresolvedIdentifier` rather than short-circuiting to
`UnresolvedRelation`.
+ val parsedPlan = spark.sessionState.sqlParser.parsePlan(
+ """WITH transformation AS (SELECT 99 AS a)
+ |INSERT INTO IDENTIFIER('some' || '_table') REPLACE WHERE a = 10
+ |SELECT * FROM transformation""".stripMargin)
+ val overwrite = parsedPlan.collectFirst { case o: OverwriteByExpression =>
o }.getOrElse(
+ fail(s"Expected OverwriteByExpression in parsed plan:\n$parsedPlan"))
+ assert(overwrite.table.isInstanceOf[PlanWithUnresolvedIdentifier],
+ s"Expected OverwriteByExpression.table to be
PlanWithUnresolvedIdentifier, " +
+ s"got ${overwrite.table.getClass.getSimpleName}:\n$parsedPlan")
+ // After CTESubstitution runs, the CTE defs should land on the command's
children (because
+ // OverwriteByExpression is a CTEInChildren) -- never as
`WithCTE(OverwriteByExpression, _)`.
+ val substituted = CTESubstitution.apply(parsedPlan)
+ substituted.foreach {
+ case WithCTE(_: CTEInChildren, _) =>
+ fail(s"Found invalid WithCTE(CTEInChildren, _) shape after
CTESubstitution:\n$substituted")
+ case _ =>
+ }
+ }
+
+ // SPARK-46625: Parameter inside `IDENTIFIER(:p)` on REPLACE WHERE lives in
+ // `OverwriteByExpression.table`, which is a non-child slot. Verify that
+ // `BindParameters.bind` reaches into the slot via the explicit
`OverwriteByExpression`
+ // recursion (parameters.scala) and that the `getDefaultTreePatternBits`
override on
+ // `OverwriteByExpression` exposes the PARAMETER bit for pruning. Done at
the rule level
+ // because driving REPLACE WHERE through full analysis would require a v2
catalog.
+ test("SPARK-46625: BindParameters recurses into
OverwriteByExpression.table") {
+ val parsedPlan = spark.sessionState.sqlParser.parsePlan(
+ """INSERT INTO IDENTIFIER(:tname) REPLACE WHERE a = 10
+ |SELECT 1 AS a""".stripMargin)
+ val overwrite = parsedPlan.collectFirst { case o: OverwriteByExpression =>
o }.getOrElse(
+ fail(s"Expected OverwriteByExpression in parsed plan:\n$parsedPlan"))
+ // Pruning prerequisite: the PARAMETER bit must be visible at the
OverwriteByExpression
+ // level (it lives inside `table`, which is not a child); this exercises
the
+ // `getDefaultTreePatternBits` override.
+ assert(overwrite.containsPattern(PARAMETER),
+ "OverwriteByExpression.getDefaultTreePatternBits must propagate
`table`'s PARAMETER bit")
+
+ val bound = BindParameters.apply(
+ NameParameterizedQuery(parsedPlan, Seq("tname"),
Seq(Literal("foo_table"))))
+ val boundOverwrite = bound.collectFirst { case o: OverwriteByExpression =>
o }.getOrElse(
+ fail(s"Expected OverwriteByExpression in bound plan:\n$bound"))
+ assert(!boundOverwrite.table.containsPattern(PARAMETER),
+ s"Expected :tname inside OverwriteByExpression.table to be bound,
got:\n$boundOverwrite")
+ }
+
+ // SPARK-46625: `CacheTableAsSelect.tempViewName` is an `Expression` slot,
so an
+ // `IDENTIFIER(<non-literal>)` produces an
`ExpressionWithUnresolvedIdentifier` there instead of
+ // wrapping the entire command in a `PlanWithUnresolvedIdentifier`. Verify
on the parsed plan
+ // that the name slot holds the expression placeholder and no
`WithCTE(CTEInChildren, _)` shape
+ // survives `CTESubstitution` (running the cache through full analysis would
require the temp
+ // view machinery, so this is a parser-level test).
+ test("SPARK-46625: CACHE TABLE IDENTIFIER(...) AS WITH ... SELECT ...
parser") {
Review Comment:
End-to-end coverage gap: the CACHE TABLE / REPLACE TABLE / `INSERT INTO
IDENTIFIER REPLACE WHERE` tests are all parser-level. For CACHE TABLE
specifically an e2e test doesn't need v2-catalog or temp-view machinery beyond
what `SharedSparkSession` already provides — `CACHE TABLE IDENTIFIER('temp_v')
AS WITH ... SELECT ...` should resolve and execute. That would exercise the
`tempViewNameString` extraction in `DataSourceV2Strategy:781` and the new
`CheckAnalysis` invariant case for `CacheTableAsSelect`, neither of which any
test currently reaches.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]