gengliangwang commented on code in PR #55836: URL: https://github.com/apache/spark/pull/55836#discussion_r3239406929
########## sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.sql.{AnalysisException, Column} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.types.StructType + +/** + * A column reference that must be a single, unqualified identifier (no nested field path and + * no table/alias qualifier). The constructor parses [[name]] with the Spark SQL parser and + * throws an [[AnalysisException]] if it does not resolve to exactly one name part. + */ +case class UnqualifiedColumnName(name: String) { Review Comment: Possible correctness gap with backtick-quoted names. `CatalystSqlParser.parseMultipartIdentifier("`a.b`")` returns `Seq("a.b")` (a single part, no backticks), so the constructor accepts the input — but `name` retains the raw string `` `a.b` ``. Downstream, `ColumnSelection.applyToSchema` matches `name` directly against `schema.fieldNames`, which contain the unquoted form `a.b`. So a user who writes `UnqualifiedColumnName("`a.b`")` to refer to the schema column literally named `a.b` will always hit `COLUMNS_NOT_FOUND`. The existing test `UnqualifiedColumnName accepts a backtick-quoted name containing a literal dot` only asserts that `.name` round-trips the raw input; it doesn't cover the include/exclude lookup, which would fail. Suggest normalizing on construction: ```scala case class UnqualifiedColumnName private (name: String) object UnqualifiedColumnName { def apply(input: String): UnqualifiedColumnName = { val parts = CatalystSqlParser.parseMultipartIdentifier(input) if (parts.length != 1) throw multipartColumnIdentifierError(input, parts) new UnqualifiedColumnName(parts.head) } } ``` This way `name` always equals the schema field name and the include/exclude lookup behaves as users expect. ########## sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.sql.{AnalysisException, Column} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.types.StructType + +/** + * A column reference that must be a single, unqualified identifier (no nested field path and + * no table/alias qualifier). The constructor parses [[name]] with the Spark SQL parser and + * throws an [[AnalysisException]] if it does not resolve to exactly one name part. + */ +case class UnqualifiedColumnName(name: String) { + UnqualifiedColumnName.validate(name) +} + +object UnqualifiedColumnName { + private def validate(columnName: String): Unit = { + val nameParts = CatalystSqlParser.parseMultipartIdentifier(columnName) + if (nameParts.length != 1) { + throw multipartColumnIdentifierError(columnName, nameParts) + } + } + + private def multipartColumnIdentifierError( + columnName: String, + nameParts: Seq[String] + ): AnalysisException = + new AnalysisException( + errorClass = "AUTOCDC_INVALID_COLUMN_SELECTION.MULTIPART_COLUMN_IDENTIFIER", + messageParameters = Map( + "columnName" -> columnName, + "nameParts" -> nameParts.mkString(", ") + ) + ) +} + +sealed trait ColumnSelection +object ColumnSelection { + type ColumnList = Seq[UnqualifiedColumnName] + + case class IncludeColumns(columns: ColumnList) extends ColumnSelection + case class ExcludeColumns(columns: ColumnList) extends ColumnSelection + + /** + * Applies [[ColumnSelection]] to a [[StructType]] and returns the filtered schema. + * Field names are matched exactly. Field order follows the original schema (filtered in place). + */ + def applyToSchema(schema: StructType, columnSelection: Option[ColumnSelection]): StructType = + columnSelection match { + case None => + // A none column selection is interpreted as a no-op. + schema + case Some(IncludeColumns(includeColumns)) => + validateColumnsExistInSchema(includeColumns, schema) + + val includeColumnSet = includeColumns.map(_.name).toSet + StructType(schema.fields.filter(f => includeColumnSet.contains(f.name))) + case Some(ExcludeColumns(excludeColumns)) => + validateColumnsExistInSchema(excludeColumns, schema) + + val excludeColumnSet = excludeColumns.map(_.name).toSet + StructType(schema.fields.filterNot(f => excludeColumnSet.contains(f.name))) + } + + private def validateColumnsExistInSchema(columns: ColumnList, schema: StructType): Unit = { + val schemaColumns = schema.fieldNames.toSet + val missingColumns = columns.map(_.name).filterNot(schemaColumns.contains).distinct + if (missingColumns.nonEmpty) { + throw new AnalysisException( + errorClass = "AUTOCDC_INVALID_COLUMN_SELECTION.COLUMNS_NOT_FOUND", + messageParameters = Map( + "missingColumns" -> missingColumns.mkString(", "), + "availableColumns" -> schema.fieldNames.mkString(", ") + )) + } + } +} + +/** The SCD (Slowly Changing Dimension) strategy for a CDC flow. */ +sealed trait ScdType + +object ScdType { + /** Representation for the standard SCD1 strategy. */ + case object Type1 extends ScdType + /** Representation for the standard SCD2 strategy. */ + case object Type2 extends ScdType +} + +/** + * Configuration for an AutoCDC flow. + * + * @param keys The column(s) that uniquely identify a row in the source data. + * @param sequencing Expression ordering CDC events to correctly resolve out-of-order + * arrivals. Must be a sortable type. + * @param deleteCondition Expression that marks a source row as a DELETE. When None, all + * rows are treated as upserts. + * @param storedAsScdType The SCD strategy these args should be applied to. + * @param columnSelection Which source columns to select in the target table. None means + * all columns. + */ +case class ChangeArgs( + keys: Seq[String], Review Comment: Type inconsistency: `keys` is `Seq[String]` while `columnSelection` uses `Seq[UnqualifiedColumnName]`. The SPIP semantics for `keys` are the same — they must be single, unqualified column references in the source schema. Using `Seq[UnqualifiedColumnName]` here would give you free construction-time validation and a uniform vocabulary across the dataclass. (If `keys` is intentionally left as `Seq[String]` for now and will be validated/normalized in a follow-up, please add a TODO so it isn't forgotten.) ########## sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.sql.{AnalysisException, Column} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.types.StructType + +/** + * A column reference that must be a single, unqualified identifier (no nested field path and + * no table/alias qualifier). The constructor parses [[name]] with the Spark SQL parser and + * throws an [[AnalysisException]] if it does not resolve to exactly one name part. + */ +case class UnqualifiedColumnName(name: String) { + UnqualifiedColumnName.validate(name) +} + +object UnqualifiedColumnName { + private def validate(columnName: String): Unit = { + val nameParts = CatalystSqlParser.parseMultipartIdentifier(columnName) + if (nameParts.length != 1) { + throw multipartColumnIdentifierError(columnName, nameParts) + } + } + + private def multipartColumnIdentifierError( + columnName: String, + nameParts: Seq[String] + ): AnalysisException = + new AnalysisException( + errorClass = "AUTOCDC_INVALID_COLUMN_SELECTION.MULTIPART_COLUMN_IDENTIFIER", + messageParameters = Map( + "columnName" -> columnName, + "nameParts" -> nameParts.mkString(", ") + ) + ) +} + +sealed trait ColumnSelection +object ColumnSelection { + type ColumnList = Seq[UnqualifiedColumnName] + + case class IncludeColumns(columns: ColumnList) extends ColumnSelection + case class ExcludeColumns(columns: ColumnList) extends ColumnSelection + + /** + * Applies [[ColumnSelection]] to a [[StructType]] and returns the filtered schema. + * Field names are matched exactly. Field order follows the original schema (filtered in place). Review Comment: Worth calling out in the scaladoc that field matching is case-sensitive and exact (no support for `spark.sql.caseSensitive=false`). The test comments document this intent, but a user reading only the public API will likely expect the global config to apply. A one-liner like ``` // Matching is case-sensitive regardless of spark.sql.caseSensitive. ``` would be helpful. (Or, if matching it to the session config is desired, do it now before the type is depended on.) ########## sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.sql.{AnalysisException, Column} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.types.StructType + +/** + * A column reference that must be a single, unqualified identifier (no nested field path and + * no table/alias qualifier). The constructor parses [[name]] with the Spark SQL parser and + * throws an [[AnalysisException]] if it does not resolve to exactly one name part. + */ +case class UnqualifiedColumnName(name: String) { + UnqualifiedColumnName.validate(name) +} + +object UnqualifiedColumnName { + private def validate(columnName: String): Unit = { + val nameParts = CatalystSqlParser.parseMultipartIdentifier(columnName) + if (nameParts.length != 1) { + throw multipartColumnIdentifierError(columnName, nameParts) + } + } + + private def multipartColumnIdentifierError( + columnName: String, + nameParts: Seq[String] + ): AnalysisException = + new AnalysisException( + errorClass = "AUTOCDC_INVALID_COLUMN_SELECTION.MULTIPART_COLUMN_IDENTIFIER", + messageParameters = Map( + "columnName" -> columnName, + "nameParts" -> nameParts.mkString(", ") + ) + ) +} + +sealed trait ColumnSelection +object ColumnSelection { + type ColumnList = Seq[UnqualifiedColumnName] + + case class IncludeColumns(columns: ColumnList) extends ColumnSelection + case class ExcludeColumns(columns: ColumnList) extends ColumnSelection + + /** + * Applies [[ColumnSelection]] to a [[StructType]] and returns the filtered schema. + * Field names are matched exactly. Field order follows the original schema (filtered in place). + */ + def applyToSchema(schema: StructType, columnSelection: Option[ColumnSelection]): StructType = + columnSelection match { + case None => + // A none column selection is interpreted as a no-op. + schema + case Some(IncludeColumns(includeColumns)) => + validateColumnsExistInSchema(includeColumns, schema) + + val includeColumnSet = includeColumns.map(_.name).toSet + StructType(schema.fields.filter(f => includeColumnSet.contains(f.name))) + case Some(ExcludeColumns(excludeColumns)) => + validateColumnsExistInSchema(excludeColumns, schema) + + val excludeColumnSet = excludeColumns.map(_.name).toSet + StructType(schema.fields.filterNot(f => excludeColumnSet.contains(f.name))) + } + + private def validateColumnsExistInSchema(columns: ColumnList, schema: StructType): Unit = { + val schemaColumns = schema.fieldNames.toSet + val missingColumns = columns.map(_.name).filterNot(schemaColumns.contains).distinct Review Comment: Minor: `validateColumnsExistInSchema` is called from both `IncludeColumns` and `ExcludeColumns` branches, and `applyToSchema` then immediately reconstructs an essentially equivalent set on the caller side. You could combine these into one helper that returns the filtered fields and the validated set in one pass, or simply have `validateColumnsExistInSchema` return the set so `applyToSchema` doesn't recompute it. Not a blocker. ########## sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.sql.{AnalysisException, Column} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.types.StructType + +/** + * A column reference that must be a single, unqualified identifier (no nested field path and + * no table/alias qualifier). The constructor parses [[name]] with the Spark SQL parser and + * throws an [[AnalysisException]] if it does not resolve to exactly one name part. + */ +case class UnqualifiedColumnName(name: String) { + UnqualifiedColumnName.validate(name) +} + +object UnqualifiedColumnName { + private def validate(columnName: String): Unit = { + val nameParts = CatalystSqlParser.parseMultipartIdentifier(columnName) + if (nameParts.length != 1) { + throw multipartColumnIdentifierError(columnName, nameParts) + } + } + + private def multipartColumnIdentifierError( + columnName: String, + nameParts: Seq[String] + ): AnalysisException = + new AnalysisException( + errorClass = "AUTOCDC_INVALID_COLUMN_SELECTION.MULTIPART_COLUMN_IDENTIFIER", + messageParameters = Map( + "columnName" -> columnName, + "nameParts" -> nameParts.mkString(", ") + ) + ) +} + +sealed trait ColumnSelection +object ColumnSelection { + type ColumnList = Seq[UnqualifiedColumnName] + + case class IncludeColumns(columns: ColumnList) extends ColumnSelection + case class ExcludeColumns(columns: ColumnList) extends ColumnSelection + + /** + * Applies [[ColumnSelection]] to a [[StructType]] and returns the filtered schema. + * Field names are matched exactly. Field order follows the original schema (filtered in place). + */ + def applyToSchema(schema: StructType, columnSelection: Option[ColumnSelection]): StructType = + columnSelection match { + case None => + // A none column selection is interpreted as a no-op. + schema + case Some(IncludeColumns(includeColumns)) => + validateColumnsExistInSchema(includeColumns, schema) + + val includeColumnSet = includeColumns.map(_.name).toSet + StructType(schema.fields.filter(f => includeColumnSet.contains(f.name))) + case Some(ExcludeColumns(excludeColumns)) => + validateColumnsExistInSchema(excludeColumns, schema) + + val excludeColumnSet = excludeColumns.map(_.name).toSet + StructType(schema.fields.filterNot(f => excludeColumnSet.contains(f.name))) + } + + private def validateColumnsExistInSchema(columns: ColumnList, schema: StructType): Unit = { + val schemaColumns = schema.fieldNames.toSet + val missingColumns = columns.map(_.name).filterNot(schemaColumns.contains).distinct + if (missingColumns.nonEmpty) { + throw new AnalysisException( + errorClass = "AUTOCDC_INVALID_COLUMN_SELECTION.COLUMNS_NOT_FOUND", + messageParameters = Map( + "missingColumns" -> missingColumns.mkString(", "), + "availableColumns" -> schema.fieldNames.mkString(", ") + )) + } + } +} + +/** The SCD (Slowly Changing Dimension) strategy for a CDC flow. */ +sealed trait ScdType + +object ScdType { + /** Representation for the standard SCD1 strategy. */ + case object Type1 extends ScdType + /** Representation for the standard SCD2 strategy. */ + case object Type2 extends ScdType +} + +/** + * Configuration for an AutoCDC flow. + * + * @param keys The column(s) that uniquely identify a row in the source data. + * @param sequencing Expression ordering CDC events to correctly resolve out-of-order + * arrivals. Must be a sortable type. + * @param deleteCondition Expression that marks a source row as a DELETE. When None, all + * rows are treated as upserts. + * @param storedAsScdType The SCD strategy these args should be applied to. + * @param columnSelection Which source columns to select in the target table. None means + * all columns. + */ +case class ChangeArgs( + keys: Seq[String], + sequencing: Column, + storedAsScdType: ScdType, + deleteCondition: Option[Column] = None, + columnSelection: Option[ColumnSelection] = None +) Review Comment: Nit: `ChangeArgs` is a `case class` carrying two `Column` fields. `Column` does not have a content-based `equals`/`hashCode` (it compares by reference / underlying expression node identity), so two `ChangeArgs` built from equivalent but separately constructed `Column` instances will compare unequal. If any downstream code plans to use `ChangeArgs` as a Map key, deduplicate it, or assert equality in tests, this will be surprising. Either (a) document this caveat, or (b) override `equals`/`hashCode` to compare `sequencing.expr.canonicalized` / `deleteCondition.map(_.expr.canonicalized)`. ########## sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala: ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} + +class ChangeArgsSuite extends SparkFunSuite { + + private val sourceSchema = new StructType() + .add("id", IntegerType, nullable = false) + .add("Name", StringType) + .add("age", IntegerType) + + test("ColumnSelection None leaves schema unchanged") { + assert(ColumnSelection.applyToSchema(sourceSchema, None) == sourceSchema) + } + + test("ColumnSelection IncludeColumns filters by exact name in schema order") { + val filteredSchema = ColumnSelection.applyToSchema( + sourceSchema, + Some(ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("Name"))))) + + assert(filteredSchema == new StructType() + .add("Name", StringType) + .add("age", IntegerType)) + } + + test("ColumnSelection ExcludeColumns filters by exact name") { + val filteredSchema = ColumnSelection.applyToSchema( + sourceSchema, + Some(ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("id"))))) + + assert(filteredSchema == new StructType() + .add("Name", StringType) + .add("age", IntegerType)) + } + + test("ColumnSelection IncludeColumns fails for columns not present in schema") { + checkError( + exception = intercept[AnalysisException] { + ColumnSelection.applyToSchema( + sourceSchema, + // Column inclusion is case-sensitive; "name" will not match against "Name". + Some(ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("name"), UnqualifiedColumnName("missing")))) + ) + }, + condition = "AUTOCDC_INVALID_COLUMN_SELECTION.COLUMNS_NOT_FOUND", + sqlState = "42703", + parameters = Map( + "missingColumns" -> "name, missing", + "availableColumns" -> "id, Name, age" + ) + ) + } + + test("ColumnSelection ExcludeColumns fails for columns not present in schema") { + checkError( + exception = intercept[AnalysisException] { + ColumnSelection.applyToSchema( + sourceSchema, + // Column exclusion is case-sensitive; "NAME" will not match against "Name". + Some(ColumnSelection.ExcludeColumns( + Seq(UnqualifiedColumnName("NAME"), UnqualifiedColumnName("missing")))) + ) + }, + condition = "AUTOCDC_INVALID_COLUMN_SELECTION.COLUMNS_NOT_FOUND", + sqlState = "42703", + parameters = Map( + "missingColumns" -> "NAME, missing", + "availableColumns" -> "id, Name, age" + ) + ) + } + + test("UnqualifiedColumnName accepts a simple single-part identifier") { + assert(UnqualifiedColumnName("col").name == "col") + } + + test("UnqualifiedColumnName accepts a backtick-quoted name containing a literal dot") { + // Backticks make the dot part of a single name part, so this passes validation. + assert(UnqualifiedColumnName("`a.b`").name == "`a.b`") Review Comment: Suggestion: add a positive test exercising `ColumnSelection.applyToSchema` with `UnqualifiedColumnName("`a.b`")` against a `StructType` that contains a field literally named `a.b`. As written this would today raise `COLUMNS_NOT_FOUND` (see comment on `UnqualifiedColumnName`), which is probably not the intended behavior. A round-trip test would have caught it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
