szehon-ho commented on code in PR #56419:
URL: https://github.com/apache/spark/pull/56419#discussion_r3456141102
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala:
##########
@@ -1548,22 +1549,43 @@ class SparkSqlAstBuilder extends AstBuilder {
)
}
- override def visitCreatePipelineDataset(
- ctx: CreatePipelineDatasetContext): LogicalPlan = withOrigin(ctx) {
- val createPipelineDatasetHeaderCtx = ctx.createPipelineDatasetHeader()
-
- val syntaxTypeErrorStr = if
(createPipelineDatasetHeaderCtx.materializedView() != null) {
- "MATERIALIZED VIEW"
- } else if (createPipelineDatasetHeaderCtx.streamingTable() != null) {
- "STREAMING TABLE"
- } else {
- // Should never be possible based on grammar definition.
- throw invalidStatement(ctx.getText, ctx)
- }
+ override def visitCreateFlowAutoCdc(
+ ctx: CreateFlowAutoCdcContext): LogicalPlan = withOrigin(ctx) {
+ val flowHeaderCtx = ctx.createPipelineFlowHeader()
+ val ident = withIdentClause(flowHeaderCtx.flowName,
UnresolvedIdentifier(_))
+ val commentOpt = Option(flowHeaderCtx.commentSpec()).map(visitCommentSpec)
+ val applyChanges = buildAutoCdcIntoCommand(ctx.autoCdcCommand())
+ CreateFlowCommand(
+ name = ident,
+ flowOperation = applyChanges,
+ comment = commentOpt
+ )
+ }
- val ifNotExists = createPipelineDatasetHeaderCtx.EXISTS() != null
- val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
- val (colDefs, colConstraints) =
Option(ctx.tableElementList()).map(visitTableElementList)
+ /**
+ * Shared helper for pipeline dataset creation statements (CREATE STREAMING
TABLE,
+ * CREATE MATERIALIZED VIEW, CREATE STREAMING TABLE ... FLOW AUTO CDC).
+ *
+ * Validates and extracts column definitions, partitioning, and table spec
from the common
+ * grammar elements shared by these statements.
+ *
+ * @return (colDefs, partitioning, spec, ifNotExists, tableIdent)
+ */
+ private def parsePipelineDatasetPrelude(
Review Comment:
+1 -- after the grammar merge removed `visitCreateStreamingTableAutoCdc`,
this helper has a single caller (`visitCreatePipelineDataset`), and all of its
params are just forwarded from that one `ctx`. The dedup rationale is gone, so
I'd inline it back to avoid the extra 6-param/5-tuple indirection.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -1362,6 +1362,44 @@ class AstBuilder extends DataTypeAstBuilder
withSchemaEvolution)
}
+ protected def buildAutoCdcIntoCommand(ctx: AutoCdcCommandContext):
AutoCdcIntoCommand =
+ withOrigin(ctx) {
+ val target = visitMultipartIdentifier(ctx.target).asTableIdentifier
+ val (src, keys, delete, seq, specCols, exceptCols) =
+ parseAutoCdcParams(ctx.autoCdcParameters())
+ AutoCdcIntoCommand(target, src, keys, delete, seq, specCols, exceptCols)
+ }
+
+ protected def parseAutoCdcParams(params: AutoCdcParametersContext): (
+ LogicalPlan,
+ Seq[UnresolvedAttribute],
+ Option[Expression],
+ Expression,
+ Seq[UnresolvedAttribute],
+ Seq[UnresolvedAttribute]) =
+ withOrigin(params) {
+ val sourceTable = plan(params.source) match {
+ case r: UnresolvedRelation => r.copy(isStreaming = true)
Review Comment:
+1. Silently coercing to `isStreaming = true` (and the `case other => other`
passthrough) hides cases the contract does not actually allow. I'd throw a
clear error for a non-streaming/non-identifier source and point users at
`STREAM(...)`, matching the Connect/Python path which models the source as a
streaming relation.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -1362,6 +1362,44 @@ class AstBuilder extends DataTypeAstBuilder
withSchemaEvolution)
}
+ protected def buildAutoCdcIntoCommand(ctx: AutoCdcCommandContext):
AutoCdcIntoCommand =
+ withOrigin(ctx) {
+ val target = visitMultipartIdentifier(ctx.target).asTableIdentifier
+ val (src, keys, delete, seq, specCols, exceptCols) =
+ parseAutoCdcParams(ctx.autoCdcParameters())
+ AutoCdcIntoCommand(target, src, keys, delete, seq, specCols, exceptCols)
+ }
+
+ protected def parseAutoCdcParams(params: AutoCdcParametersContext): (
+ LogicalPlan,
+ Seq[UnresolvedAttribute],
+ Option[Expression],
+ Expression,
+ Seq[UnresolvedAttribute],
+ Seq[UnresolvedAttribute]) =
+ withOrigin(params) {
+ val sourceTable = plan(params.source) match {
+ case r: UnresolvedRelation => r.copy(isStreaming = true)
+ case other => other
Review Comment:
Agree. Since `source=relationPrimary`, `case other => other` currently
accepts subqueries, inline `VALUES`, TVFs, etc. as the source child and passes
them through unmarked. Given SDP expects `source` to be a plain identifier
(`optional string source`), I'd restrict this to a (optionally
`STREAM`-wrapped) table identifier and reject the rest. That also lets
`AutoCdcIntoCommand` carry a `TableIdentifier` and go back to a leaf, as you
noted.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -1362,6 +1362,44 @@ class AstBuilder extends DataTypeAstBuilder
withSchemaEvolution)
}
+ protected def buildAutoCdcIntoCommand(ctx: AutoCdcCommandContext):
AutoCdcIntoCommand =
+ withOrigin(ctx) {
+ val target = visitMultipartIdentifier(ctx.target).asTableIdentifier
+ val (src, keys, delete, seq, specCols, exceptCols) =
+ parseAutoCdcParams(ctx.autoCdcParameters())
+ AutoCdcIntoCommand(target, src, keys, delete, seq, specCols, exceptCols)
+ }
+
+ protected def parseAutoCdcParams(params: AutoCdcParametersContext): (
+ LogicalPlan,
+ Seq[UnresolvedAttribute],
+ Option[Expression],
+ Expression,
+ Seq[UnresolvedAttribute],
+ Seq[UnresolvedAttribute]) =
Review Comment:
+1, same reaction. A 6-element tuple destructured at two call sites is easy
to misorder; a small `AutoCdcParams` case class (or returning
`AutoCdcIntoCommand` directly) would be clearer.
##########
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -750,6 +751,37 @@ dmlStatementNoWith
notMatchedBySourceClause*
#mergeIntoTable
;
+autoCdcCommand
+ : AUTO CDC INTO target=multipartIdentifier
+ autoCdcParameters
+ ;
+
+autoCdcBody
+ : AUTO CDC autoCdcParameters
+ ;
+
+autoCdcParameters
+ : FROM source=relationPrimary
+ KEYS LEFT_PAREN keys=multipartIdentifierList RIGHT_PAREN
Review Comment:
+1 -- keys and the COLUMNS list only allow simple column identifiers per the
contract, so `multipartIdentifierList` lets e.g. `KEYS (a.b.c)` parse and
pushes the check downstream. Using `identifierSeq`/`identifierList` makes it
correct by construction.
##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -5529,6 +5529,12 @@
},
"sqlState" : "0A000"
},
+ "MISSING_CLAUSES_FOR_OPERATION" : {
Review Comment:
Confirmed it should stay, though not for AUTO CDC: on current master
`QueryParsingErrors.missingClausesForOperation` is referenced by the METRIC
VIEW path (`SparkSqlParser`), but `MISSING_CLAUSES_FOR_OPERATION` is not
actually defined in `error-conditions.json` on master -- so this entry is what
backs that error. Probably cleaner to land it as its own small fix since it is
orthogonal to AUTO CDC.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]