szehon-ho commented on code in PR #56419:
URL: https://github.com/apache/spark/pull/56419#discussion_r3456141102


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala:
##########
@@ -1548,22 +1549,43 @@ class SparkSqlAstBuilder extends AstBuilder {
     )
   }
 
-  override def visitCreatePipelineDataset(
-      ctx: CreatePipelineDatasetContext): LogicalPlan = withOrigin(ctx) {
-    val createPipelineDatasetHeaderCtx = ctx.createPipelineDatasetHeader()
-
-    val syntaxTypeErrorStr = if 
(createPipelineDatasetHeaderCtx.materializedView() != null) {
-      "MATERIALIZED VIEW"
-    } else if (createPipelineDatasetHeaderCtx.streamingTable() != null) {
-      "STREAMING TABLE"
-    } else {
-      // Should never be possible based on grammar definition.
-      throw invalidStatement(ctx.getText, ctx)
-    }
+  override def visitCreateFlowAutoCdc(
+      ctx: CreateFlowAutoCdcContext): LogicalPlan = withOrigin(ctx) {
+    val flowHeaderCtx = ctx.createPipelineFlowHeader()
+    val ident = withIdentClause(flowHeaderCtx.flowName, 
UnresolvedIdentifier(_))
+    val commentOpt = Option(flowHeaderCtx.commentSpec()).map(visitCommentSpec)
+    val applyChanges = buildAutoCdcIntoCommand(ctx.autoCdcCommand())
+    CreateFlowCommand(
+      name = ident,
+      flowOperation = applyChanges,
+      comment = commentOpt
+    )
+  }
 
-    val ifNotExists = createPipelineDatasetHeaderCtx.EXISTS() != null
-    val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
-    val (colDefs, colConstraints) = 
Option(ctx.tableElementList()).map(visitTableElementList)
+  /**
+   * Shared helper for pipeline dataset creation statements (CREATE STREAMING 
TABLE,
+   * CREATE MATERIALIZED VIEW, CREATE STREAMING TABLE ... FLOW AUTO CDC).
+   *
+   * Validates and extracts column definitions, partitioning, and table spec 
from the common
+   * grammar elements shared by these statements.
+   *
+   * @return (colDefs, partitioning, spec, ifNotExists, tableIdent)
+   */
+  private def parsePipelineDatasetPrelude(

Review Comment:
   +1 -- after the grammar merge removed `visitCreateStreamingTableAutoCdc`, 
this helper has a single caller (`visitCreatePipelineDataset`), and all of its 
params are just forwarded from that one `ctx`. The dedup rationale is gone, so 
I'd inline it back to avoid the extra 6-param/5-tuple indirection.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -1362,6 +1362,44 @@ class AstBuilder extends DataTypeAstBuilder
       withSchemaEvolution)
   }
 
+  protected def buildAutoCdcIntoCommand(ctx: AutoCdcCommandContext): 
AutoCdcIntoCommand =
+    withOrigin(ctx) {
+      val target = visitMultipartIdentifier(ctx.target).asTableIdentifier
+      val (src, keys, delete, seq, specCols, exceptCols) =
+        parseAutoCdcParams(ctx.autoCdcParameters())
+      AutoCdcIntoCommand(target, src, keys, delete, seq, specCols, exceptCols)
+    }
+
+  protected def parseAutoCdcParams(params: AutoCdcParametersContext): (
+      LogicalPlan,
+      Seq[UnresolvedAttribute],
+      Option[Expression],
+      Expression,
+      Seq[UnresolvedAttribute],
+      Seq[UnresolvedAttribute]) =
+    withOrigin(params) {
+      val sourceTable = plan(params.source) match {
+        case r: UnresolvedRelation => r.copy(isStreaming = true)

Review Comment:
   +1. Silently coercing to `isStreaming = true` (and the `case other => other` 
passthrough) hides cases the contract does not actually allow. I'd throw a 
clear error for a non-streaming/non-identifier source and point users at 
`STREAM(...)`, matching the Connect/Python path which models the source as a 
streaming relation.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -1362,6 +1362,44 @@ class AstBuilder extends DataTypeAstBuilder
       withSchemaEvolution)
   }
 
+  protected def buildAutoCdcIntoCommand(ctx: AutoCdcCommandContext): 
AutoCdcIntoCommand =
+    withOrigin(ctx) {
+      val target = visitMultipartIdentifier(ctx.target).asTableIdentifier
+      val (src, keys, delete, seq, specCols, exceptCols) =
+        parseAutoCdcParams(ctx.autoCdcParameters())
+      AutoCdcIntoCommand(target, src, keys, delete, seq, specCols, exceptCols)
+    }
+
+  protected def parseAutoCdcParams(params: AutoCdcParametersContext): (
+      LogicalPlan,
+      Seq[UnresolvedAttribute],
+      Option[Expression],
+      Expression,
+      Seq[UnresolvedAttribute],
+      Seq[UnresolvedAttribute]) =
+    withOrigin(params) {
+      val sourceTable = plan(params.source) match {
+        case r: UnresolvedRelation => r.copy(isStreaming = true)
+        case other => other

Review Comment:
   Agree. Since `source=relationPrimary`, `case other => other` currently 
accepts subqueries, inline `VALUES`, TVFs, etc. as the source child and passes 
them through unmarked. Given SDP expects `source` to be a plain identifier 
(`optional string source`), I'd restrict this to a (optionally 
`STREAM`-wrapped) table identifier and reject the rest. That also lets 
`AutoCdcIntoCommand` carry a `TableIdentifier` and go back to a leaf, as you 
noted.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -1362,6 +1362,44 @@ class AstBuilder extends DataTypeAstBuilder
       withSchemaEvolution)
   }
 
+  protected def buildAutoCdcIntoCommand(ctx: AutoCdcCommandContext): 
AutoCdcIntoCommand =
+    withOrigin(ctx) {
+      val target = visitMultipartIdentifier(ctx.target).asTableIdentifier
+      val (src, keys, delete, seq, specCols, exceptCols) =
+        parseAutoCdcParams(ctx.autoCdcParameters())
+      AutoCdcIntoCommand(target, src, keys, delete, seq, specCols, exceptCols)
+    }
+
+  protected def parseAutoCdcParams(params: AutoCdcParametersContext): (
+      LogicalPlan,
+      Seq[UnresolvedAttribute],
+      Option[Expression],
+      Expression,
+      Seq[UnresolvedAttribute],
+      Seq[UnresolvedAttribute]) =

Review Comment:
   +1, same reaction. A 6-element tuple destructured at two call sites is easy 
to misorder; a small `AutoCdcParams` case class (or returning 
`AutoCdcIntoCommand` directly) would be clearer.



##########
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -750,6 +751,37 @@ dmlStatementNoWith
         notMatchedBySourceClause*                                              
    #mergeIntoTable
     ;
 
+autoCdcCommand
+    : AUTO CDC INTO target=multipartIdentifier
+        autoCdcParameters
+    ;
+
+autoCdcBody
+    : AUTO CDC autoCdcParameters
+    ;
+
+autoCdcParameters
+    : FROM source=relationPrimary
+        KEYS LEFT_PAREN keys=multipartIdentifierList RIGHT_PAREN

Review Comment:
   +1 -- keys and the COLUMNS list only allow simple column identifiers per the 
contract, so `multipartIdentifierList` lets e.g. `KEYS (a.b.c)` parse and 
pushes the check downstream. Using `identifierSeq`/`identifierList` makes it 
correct by construction.



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -5529,6 +5529,12 @@
     },
     "sqlState" : "0A000"
   },
+  "MISSING_CLAUSES_FOR_OPERATION" : {

Review Comment:
   Confirmed it should stay, though not for AUTO CDC: on current master 
`QueryParsingErrors.missingClausesForOperation` is referenced by the METRIC 
VIEW path (`SparkSqlParser`), but `MISSING_CLAUSES_FOR_OPERATION` is not 
actually defined in `error-conditions.json` on master -- so this entry is what 
backs that error. Probably cleaner to land it as its own small fix since it is 
orthogonal to AUTO CDC.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to