AnishMahto commented on code in PR #56419:
URL: https://github.com/apache/spark/pull/56419#discussion_r3410612346
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -1362,6 +1362,44 @@ class AstBuilder extends DataTypeAstBuilder
withSchemaEvolution)
}
+ protected def buildAutoCdcIntoCommand(ctx: AutoCdcCommandContext):
AutoCdcIntoCommand =
+ withOrigin(ctx) {
+ val target = visitMultipartIdentifier(ctx.target).asTableIdentifier
+ val (src, keys, delete, seq, specCols, exceptCols) =
+ parseAutoCdcParams(ctx.autoCdcParameters())
+ AutoCdcIntoCommand(target, src, keys, delete, seq, specCols, exceptCols)
+ }
+
+ protected def parseAutoCdcParams(params: AutoCdcParametersContext): (
+ LogicalPlan,
+ Seq[UnresolvedAttribute],
+ Option[Expression],
+ Expression,
+ Seq[UnresolvedAttribute],
+ Seq[UnresolvedAttribute]) =
+ withOrigin(params) {
+ val sourceTable = plan(params.source) match {
+ case r: UnresolvedRelation => r.copy(isStreaming = true)
Review Comment:
Instead of silently forcing the unresolved relation to be streaming, can we
just throw if the user tries specifying a non-streaming relation? The error can
guide users to use `STREAM`.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -1362,6 +1362,44 @@ class AstBuilder extends DataTypeAstBuilder
withSchemaEvolution)
}
+ protected def buildAutoCdcIntoCommand(ctx: AutoCdcCommandContext):
AutoCdcIntoCommand =
+ withOrigin(ctx) {
+ val target = visitMultipartIdentifier(ctx.target).asTableIdentifier
+ val (src, keys, delete, seq, specCols, exceptCols) =
+ parseAutoCdcParams(ctx.autoCdcParameters())
+ AutoCdcIntoCommand(target, src, keys, delete, seq, specCols, exceptCols)
+ }
+
+ protected def parseAutoCdcParams(params: AutoCdcParametersContext): (
+ LogicalPlan,
+ Seq[UnresolvedAttribute],
+ Option[Expression],
+ Expression,
+ Seq[UnresolvedAttribute],
+ Seq[UnresolvedAttribute]) =
+ withOrigin(params) {
+ val sourceTable = plan(params.source) match {
+ case r: UnresolvedRelation => r.copy(isStreaming = true)
+ case other => other
Review Comment:
Eventually in SDP we expect `source` to simply be a string identifier:
https://github.com/apache/spark/blob/62e4e166698f95bcbeb16687ceaa61a3a8f14dbc/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala#L444
https://github.com/apache/spark/blob/62e4e166698f95bcbeb16687ceaa61a3a8f14dbc/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto#L162
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -1362,6 +1362,44 @@ class AstBuilder extends DataTypeAstBuilder
withSchemaEvolution)
}
+ protected def buildAutoCdcIntoCommand(ctx: AutoCdcCommandContext):
AutoCdcIntoCommand =
+ withOrigin(ctx) {
+ val target = visitMultipartIdentifier(ctx.target).asTableIdentifier
+ val (src, keys, delete, seq, specCols, exceptCols) =
+ parseAutoCdcParams(ctx.autoCdcParameters())
+ AutoCdcIntoCommand(target, src, keys, delete, seq, specCols, exceptCols)
+ }
+
+ protected def parseAutoCdcParams(params: AutoCdcParametersContext): (
+ LogicalPlan,
+ Seq[UnresolvedAttribute],
+ Option[Expression],
+ Expression,
+ Seq[UnresolvedAttribute],
+ Seq[UnresolvedAttribute]) =
+ withOrigin(params) {
+ val sourceTable = plan(params.source) match {
+ case r: UnresolvedRelation => r.copy(isStreaming = true)
+ case other => other
Review Comment:
Should we invalidate against all other logical plans?
I don't know how this concretely maps against logical plan subclasses, but
at the syntax level we should only be allowing source to be a multipart
identifier, or STREAM(multipart identifier).
I _think_ both of these will map directly to an `UnresolvedRelation`. And
then can we take this a further where `AutoCdcIntoCommand` requires `source` to
be a `TableIdentifier`/`Identifier` rather than an arbitrary `LogicalPlan`?
Then I guess `AutoCdcIntoCommand` can go back to being a leaf plan type.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala:
##########
@@ -1548,22 +1549,43 @@ class SparkSqlAstBuilder extends AstBuilder {
)
}
- override def visitCreatePipelineDataset(
- ctx: CreatePipelineDatasetContext): LogicalPlan = withOrigin(ctx) {
- val createPipelineDatasetHeaderCtx = ctx.createPipelineDatasetHeader()
-
- val syntaxTypeErrorStr = if
(createPipelineDatasetHeaderCtx.materializedView() != null) {
- "MATERIALIZED VIEW"
- } else if (createPipelineDatasetHeaderCtx.streamingTable() != null) {
- "STREAMING TABLE"
- } else {
- // Should never be possible based on grammar definition.
- throw invalidStatement(ctx.getText, ctx)
- }
+ override def visitCreateFlowAutoCdc(
+ ctx: CreateFlowAutoCdcContext): LogicalPlan = withOrigin(ctx) {
+ val flowHeaderCtx = ctx.createPipelineFlowHeader()
+ val ident = withIdentClause(flowHeaderCtx.flowName,
UnresolvedIdentifier(_))
+ val commentOpt = Option(flowHeaderCtx.commentSpec()).map(visitCommentSpec)
+ val applyChanges = buildAutoCdcIntoCommand(ctx.autoCdcCommand())
+ CreateFlowCommand(
+ name = ident,
+ flowOperation = applyChanges,
+ comment = commentOpt
+ )
+ }
- val ifNotExists = createPipelineDatasetHeaderCtx.EXISTS() != null
- val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
- val (colDefs, colConstraints) =
Option(ctx.tableElementList()).map(visitTableElementList)
+ /**
+ * Shared helper for pipeline dataset creation statements (CREATE STREAMING
TABLE,
+ * CREATE MATERIALIZED VIEW, CREATE STREAMING TABLE ... FLOW AUTO CDC).
+ *
+ * Validates and extracts column definitions, partitioning, and table spec
from the common
+ * grammar elements shared by these statements.
+ *
+ * @return (colDefs, partitioning, spec, ifNotExists, tableIdent)
+ */
+ private def parsePipelineDatasetPrelude(
Review Comment:
Do we still need this function now that the same `createPipelineDataset`
rule is hit for both `AS query` and `FLOW AUTO CDC` sub-rules?
##########
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -750,6 +751,37 @@ dmlStatementNoWith
notMatchedBySourceClause*
#mergeIntoTable
;
+autoCdcCommand
+ : AUTO CDC INTO target=multipartIdentifier
+ autoCdcParameters
+ ;
+
+autoCdcBody
+ : AUTO CDC autoCdcParameters
+ ;
+
+autoCdcParameters
+ : FROM source=relationPrimary
+ KEYS LEFT_PAREN keys=multipartIdentifierList RIGHT_PAREN
Review Comment:
For keys and the column selection clause, we don't actually allow multipart
identifiers as part of the AutoCDC API contract.
Should we use `identifier` instead of `multipartIdentifierList` to reflect
this? Either way SDP will invalidate multipart identifiers, but I figure we
should make it true be construction where possible.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -1362,6 +1362,44 @@ class AstBuilder extends DataTypeAstBuilder
withSchemaEvolution)
}
+ protected def buildAutoCdcIntoCommand(ctx: AutoCdcCommandContext):
AutoCdcIntoCommand =
+ withOrigin(ctx) {
+ val target = visitMultipartIdentifier(ctx.target).asTableIdentifier
+ val (src, keys, delete, seq, specCols, exceptCols) =
+ parseAutoCdcParams(ctx.autoCdcParameters())
+ AutoCdcIntoCommand(target, src, keys, delete, seq, specCols, exceptCols)
+ }
+
+ protected def parseAutoCdcParams(params: AutoCdcParametersContext): (
+ LogicalPlan,
+ Seq[UnresolvedAttribute],
+ Option[Expression],
+ Expression,
+ Seq[UnresolvedAttribute],
+ Seq[UnresolvedAttribute]) =
Review Comment:
optional: Might be nice to put this in a case class before returning, and
then `AutoCdcIntoCommand` can also just hold a reference to said case class. Or
maybe even just directly return an `AutoCdcIntoCommand` from this function.
I generally don't like returning a tuple longer than 2-3 elements, its hard
to keep track of position <-> semantic value mentally.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]