gengliangwang commented on code in PR #55836:
URL: https://github.com/apache/spark/pull/55836#discussion_r3239406929


##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.sql.{AnalysisException, Column}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A column reference that must be a single, unqualified identifier (no nested 
field path and
+ * no table/alias qualifier). The constructor parses [[name]] with the Spark 
SQL parser and
+ * throws an [[AnalysisException]] if it does not resolve to exactly one name 
part.
+ */
+case class UnqualifiedColumnName(name: String) {

Review Comment:
   Possible correctness gap with backtick-quoted names. 
`CatalystSqlParser.parseMultipartIdentifier("`a.b`")` returns `Seq("a.b")` (a 
single part, no backticks), so the constructor accepts the input — but `name` 
retains the raw string `` `a.b` ``. Downstream, `ColumnSelection.applyToSchema` 
matches `name` directly against `schema.fieldNames`, which contain the unquoted 
form `a.b`. So a user who writes `UnqualifiedColumnName("`a.b`")` to refer to 
the schema column literally named `a.b` will always hit `COLUMNS_NOT_FOUND`.
   
   The existing test `UnqualifiedColumnName accepts a backtick-quoted name 
containing a literal dot` only asserts that `.name` round-trips the raw input; 
it doesn't cover the include/exclude lookup, which would fail.
   
   Suggest normalizing on construction:
   ```scala
   case class UnqualifiedColumnName private (name: String)
   object UnqualifiedColumnName {
     def apply(input: String): UnqualifiedColumnName = {
       val parts = CatalystSqlParser.parseMultipartIdentifier(input)
       if (parts.length != 1) throw multipartColumnIdentifierError(input, parts)
       new UnqualifiedColumnName(parts.head)
     }
   }
   ```
   This way `name` always equals the schema field name and the include/exclude 
lookup behaves as users expect.



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.sql.{AnalysisException, Column}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A column reference that must be a single, unqualified identifier (no nested 
field path and
+ * no table/alias qualifier). The constructor parses [[name]] with the Spark 
SQL parser and
+ * throws an [[AnalysisException]] if it does not resolve to exactly one name 
part.
+ */
+case class UnqualifiedColumnName(name: String) {
+  UnqualifiedColumnName.validate(name)
+}
+
+object UnqualifiedColumnName {
+  private def validate(columnName: String): Unit = {
+    val nameParts = CatalystSqlParser.parseMultipartIdentifier(columnName)
+    if (nameParts.length != 1) {
+      throw multipartColumnIdentifierError(columnName, nameParts)
+    }
+  }
+
+  private def multipartColumnIdentifierError(
+      columnName: String,
+      nameParts: Seq[String]
+  ): AnalysisException =
+    new AnalysisException(
+      errorClass = 
"AUTOCDC_INVALID_COLUMN_SELECTION.MULTIPART_COLUMN_IDENTIFIER",
+      messageParameters = Map(
+        "columnName" -> columnName,
+        "nameParts" -> nameParts.mkString(", ")
+      )
+    )
+}
+
+sealed trait ColumnSelection
+object ColumnSelection {
+  type ColumnList = Seq[UnqualifiedColumnName]
+
+  case class IncludeColumns(columns: ColumnList) extends ColumnSelection
+  case class ExcludeColumns(columns: ColumnList) extends ColumnSelection
+
+  /**
+   * Applies [[ColumnSelection]] to a [[StructType]] and returns the filtered 
schema.
+   * Field names are matched exactly. Field order follows the original schema 
(filtered in place).
+   */
+  def applyToSchema(schema: StructType, columnSelection: 
Option[ColumnSelection]): StructType =
+    columnSelection match {
+      case None =>
+        // A none column selection is interpreted as a no-op.
+        schema
+      case Some(IncludeColumns(includeColumns)) =>
+        validateColumnsExistInSchema(includeColumns, schema)
+
+        val includeColumnSet = includeColumns.map(_.name).toSet
+        StructType(schema.fields.filter(f => 
includeColumnSet.contains(f.name)))
+      case Some(ExcludeColumns(excludeColumns)) =>
+        validateColumnsExistInSchema(excludeColumns, schema)
+
+        val excludeColumnSet = excludeColumns.map(_.name).toSet
+        StructType(schema.fields.filterNot(f => 
excludeColumnSet.contains(f.name)))
+    }
+
+  private def validateColumnsExistInSchema(columns: ColumnList, schema: 
StructType): Unit = {
+    val schemaColumns = schema.fieldNames.toSet
+    val missingColumns = 
columns.map(_.name).filterNot(schemaColumns.contains).distinct
+    if (missingColumns.nonEmpty) {
+      throw new AnalysisException(
+        errorClass = "AUTOCDC_INVALID_COLUMN_SELECTION.COLUMNS_NOT_FOUND",
+        messageParameters = Map(
+          "missingColumns" -> missingColumns.mkString(", "),
+          "availableColumns" -> schema.fieldNames.mkString(", ")
+        ))
+    }
+  }
+}
+
+/** The SCD (Slowly Changing Dimension) strategy for a CDC flow. */
+sealed trait ScdType
+
+object ScdType {
+  /** Representation for the standard SCD1 strategy. */
+  case object Type1 extends ScdType
+  /** Representation for the standard SCD2 strategy. */
+  case object Type2 extends ScdType
+}
+
+/**
+ * Configuration for an AutoCDC flow.
+ *
+ * @param keys            The column(s) that uniquely identify a row in the 
source data.
+ * @param sequencing      Expression ordering CDC events to correctly resolve 
out-of-order
+ *                        arrivals. Must be a sortable type.
+ * @param deleteCondition Expression that marks a source row as a DELETE. When 
None, all
+ *                        rows are treated as upserts.
+ * @param storedAsScdType The SCD strategy these args should be applied to.
+ * @param columnSelection Which source columns to select in the target table. 
None means
+ *                        all columns.
+ */
+case class ChangeArgs(
+    keys: Seq[String],

Review Comment:
   Type inconsistency: `keys` is `Seq[String]` while `columnSelection` uses 
`Seq[UnqualifiedColumnName]`. The SPIP semantics for `keys` are the same — they 
must be single, unqualified column references in the source schema. Using 
`Seq[UnqualifiedColumnName]` here would give you free construction-time 
validation and a uniform vocabulary across the dataclass. (If `keys` is 
intentionally left as `Seq[String]` for now and will be validated/normalized in 
a follow-up, please add a TODO so it isn't forgotten.)



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.sql.{AnalysisException, Column}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A column reference that must be a single, unqualified identifier (no nested 
field path and
+ * no table/alias qualifier). The constructor parses [[name]] with the Spark 
SQL parser and
+ * throws an [[AnalysisException]] if it does not resolve to exactly one name 
part.
+ */
+case class UnqualifiedColumnName(name: String) {
+  UnqualifiedColumnName.validate(name)
+}
+
+object UnqualifiedColumnName {
+  private def validate(columnName: String): Unit = {
+    val nameParts = CatalystSqlParser.parseMultipartIdentifier(columnName)
+    if (nameParts.length != 1) {
+      throw multipartColumnIdentifierError(columnName, nameParts)
+    }
+  }
+
+  private def multipartColumnIdentifierError(
+      columnName: String,
+      nameParts: Seq[String]
+  ): AnalysisException =
+    new AnalysisException(
+      errorClass = 
"AUTOCDC_INVALID_COLUMN_SELECTION.MULTIPART_COLUMN_IDENTIFIER",
+      messageParameters = Map(
+        "columnName" -> columnName,
+        "nameParts" -> nameParts.mkString(", ")
+      )
+    )
+}
+
+sealed trait ColumnSelection
+object ColumnSelection {
+  type ColumnList = Seq[UnqualifiedColumnName]
+
+  case class IncludeColumns(columns: ColumnList) extends ColumnSelection
+  case class ExcludeColumns(columns: ColumnList) extends ColumnSelection
+
+  /**
+   * Applies [[ColumnSelection]] to a [[StructType]] and returns the filtered 
schema.
+   * Field names are matched exactly. Field order follows the original schema 
(filtered in place).

Review Comment:
   Worth calling out in the scaladoc that field matching is case-sensitive and 
exact (no support for `spark.sql.caseSensitive=false`). The test comments 
document this intent, but a user reading only the public API will likely expect 
the global config to apply. A one-liner like
   ```
   // Matching is case-sensitive regardless of spark.sql.caseSensitive.
   ```
   would be helpful. (Or, if matching it to the session config is desired, do 
it now before the type is depended on.)



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.sql.{AnalysisException, Column}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A column reference that must be a single, unqualified identifier (no nested 
field path and
+ * no table/alias qualifier). The constructor parses [[name]] with the Spark 
SQL parser and
+ * throws an [[AnalysisException]] if it does not resolve to exactly one name 
part.
+ */
+case class UnqualifiedColumnName(name: String) {
+  UnqualifiedColumnName.validate(name)
+}
+
+object UnqualifiedColumnName {
+  private def validate(columnName: String): Unit = {
+    val nameParts = CatalystSqlParser.parseMultipartIdentifier(columnName)
+    if (nameParts.length != 1) {
+      throw multipartColumnIdentifierError(columnName, nameParts)
+    }
+  }
+
+  private def multipartColumnIdentifierError(
+      columnName: String,
+      nameParts: Seq[String]
+  ): AnalysisException =
+    new AnalysisException(
+      errorClass = 
"AUTOCDC_INVALID_COLUMN_SELECTION.MULTIPART_COLUMN_IDENTIFIER",
+      messageParameters = Map(
+        "columnName" -> columnName,
+        "nameParts" -> nameParts.mkString(", ")
+      )
+    )
+}
+
+sealed trait ColumnSelection
+object ColumnSelection {
+  type ColumnList = Seq[UnqualifiedColumnName]
+
+  case class IncludeColumns(columns: ColumnList) extends ColumnSelection
+  case class ExcludeColumns(columns: ColumnList) extends ColumnSelection
+
+  /**
+   * Applies [[ColumnSelection]] to a [[StructType]] and returns the filtered 
schema.
+   * Field names are matched exactly. Field order follows the original schema 
(filtered in place).
+   */
+  def applyToSchema(schema: StructType, columnSelection: 
Option[ColumnSelection]): StructType =
+    columnSelection match {
+      case None =>
+        // A none column selection is interpreted as a no-op.
+        schema
+      case Some(IncludeColumns(includeColumns)) =>
+        validateColumnsExistInSchema(includeColumns, schema)
+
+        val includeColumnSet = includeColumns.map(_.name).toSet
+        StructType(schema.fields.filter(f => 
includeColumnSet.contains(f.name)))
+      case Some(ExcludeColumns(excludeColumns)) =>
+        validateColumnsExistInSchema(excludeColumns, schema)
+
+        val excludeColumnSet = excludeColumns.map(_.name).toSet
+        StructType(schema.fields.filterNot(f => 
excludeColumnSet.contains(f.name)))
+    }
+
+  private def validateColumnsExistInSchema(columns: ColumnList, schema: 
StructType): Unit = {
+    val schemaColumns = schema.fieldNames.toSet
+    val missingColumns = 
columns.map(_.name).filterNot(schemaColumns.contains).distinct

Review Comment:
   Minor: `validateColumnsExistInSchema` is called from both `IncludeColumns` 
and `ExcludeColumns` branches, and `applyToSchema` then immediately 
reconstructs an essentially equivalent set on the caller side. You could 
combine these into one helper that returns the filtered fields and the 
validated set in one pass, or simply have `validateColumnsExistInSchema` return 
the set so `applyToSchema` doesn't recompute it. Not a blocker.



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.sql.{AnalysisException, Column}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A column reference that must be a single, unqualified identifier (no nested 
field path and
+ * no table/alias qualifier). The constructor parses [[name]] with the Spark 
SQL parser and
+ * throws an [[AnalysisException]] if it does not resolve to exactly one name 
part.
+ */
+case class UnqualifiedColumnName(name: String) {
+  UnqualifiedColumnName.validate(name)
+}
+
+object UnqualifiedColumnName {
+  private def validate(columnName: String): Unit = {
+    val nameParts = CatalystSqlParser.parseMultipartIdentifier(columnName)
+    if (nameParts.length != 1) {
+      throw multipartColumnIdentifierError(columnName, nameParts)
+    }
+  }
+
+  private def multipartColumnIdentifierError(
+      columnName: String,
+      nameParts: Seq[String]
+  ): AnalysisException =
+    new AnalysisException(
+      errorClass = 
"AUTOCDC_INVALID_COLUMN_SELECTION.MULTIPART_COLUMN_IDENTIFIER",
+      messageParameters = Map(
+        "columnName" -> columnName,
+        "nameParts" -> nameParts.mkString(", ")
+      )
+    )
+}
+
+sealed trait ColumnSelection
+object ColumnSelection {
+  type ColumnList = Seq[UnqualifiedColumnName]
+
+  case class IncludeColumns(columns: ColumnList) extends ColumnSelection
+  case class ExcludeColumns(columns: ColumnList) extends ColumnSelection
+
+  /**
+   * Applies [[ColumnSelection]] to a [[StructType]] and returns the filtered 
schema.
+   * Field names are matched exactly. Field order follows the original schema 
(filtered in place).
+   */
+  def applyToSchema(schema: StructType, columnSelection: 
Option[ColumnSelection]): StructType =
+    columnSelection match {
+      case None =>
+        // A none column selection is interpreted as a no-op.
+        schema
+      case Some(IncludeColumns(includeColumns)) =>
+        validateColumnsExistInSchema(includeColumns, schema)
+
+        val includeColumnSet = includeColumns.map(_.name).toSet
+        StructType(schema.fields.filter(f => 
includeColumnSet.contains(f.name)))
+      case Some(ExcludeColumns(excludeColumns)) =>
+        validateColumnsExistInSchema(excludeColumns, schema)
+
+        val excludeColumnSet = excludeColumns.map(_.name).toSet
+        StructType(schema.fields.filterNot(f => 
excludeColumnSet.contains(f.name)))
+    }
+
+  private def validateColumnsExistInSchema(columns: ColumnList, schema: 
StructType): Unit = {
+    val schemaColumns = schema.fieldNames.toSet
+    val missingColumns = 
columns.map(_.name).filterNot(schemaColumns.contains).distinct
+    if (missingColumns.nonEmpty) {
+      throw new AnalysisException(
+        errorClass = "AUTOCDC_INVALID_COLUMN_SELECTION.COLUMNS_NOT_FOUND",
+        messageParameters = Map(
+          "missingColumns" -> missingColumns.mkString(", "),
+          "availableColumns" -> schema.fieldNames.mkString(", ")
+        ))
+    }
+  }
+}
+
+/** The SCD (Slowly Changing Dimension) strategy for a CDC flow. */
+sealed trait ScdType
+
+object ScdType {
+  /** Representation for the standard SCD1 strategy. */
+  case object Type1 extends ScdType
+  /** Representation for the standard SCD2 strategy. */
+  case object Type2 extends ScdType
+}
+
+/**
+ * Configuration for an AutoCDC flow.
+ *
+ * @param keys            The column(s) that uniquely identify a row in the 
source data.
+ * @param sequencing      Expression ordering CDC events to correctly resolve 
out-of-order
+ *                        arrivals. Must be a sortable type.
+ * @param deleteCondition Expression that marks a source row as a DELETE. When 
None, all
+ *                        rows are treated as upserts.
+ * @param storedAsScdType The SCD strategy these args should be applied to.
+ * @param columnSelection Which source columns to select in the target table. 
None means
+ *                        all columns.
+ */
+case class ChangeArgs(
+    keys: Seq[String],
+    sequencing: Column,
+    storedAsScdType: ScdType,
+    deleteCondition: Option[Column] = None,
+    columnSelection: Option[ColumnSelection] = None
+)

Review Comment:
   Nit: `ChangeArgs` is a `case class` carrying two `Column` fields. `Column` 
does not have a content-based `equals`/`hashCode` (it compares by reference / 
underlying expression node identity), so two `ChangeArgs` built from equivalent 
but separately constructed `Column` instances will compare unequal. If any 
downstream code plans to use `ChangeArgs` as a Map key, deduplicate it, or 
assert equality in tests, this will be surprising. Either (a) document this 
caveat, or (b) override `equals`/`hashCode` to compare 
`sequencing.expr.canonicalized` / `deleteCondition.map(_.expr.canonicalized)`.



##########
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
+
+class ChangeArgsSuite extends SparkFunSuite {
+
+  private val sourceSchema = new StructType()
+    .add("id", IntegerType, nullable = false)
+    .add("Name", StringType)
+    .add("age", IntegerType)
+
+  test("ColumnSelection None leaves schema unchanged") {
+    assert(ColumnSelection.applyToSchema(sourceSchema, None) == sourceSchema)
+  }
+
+  test("ColumnSelection IncludeColumns filters by exact name in schema order") 
{
+    val filteredSchema = ColumnSelection.applyToSchema(
+      sourceSchema,
+      Some(ColumnSelection.IncludeColumns(
+        Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("Name")))))
+
+    assert(filteredSchema == new StructType()
+      .add("Name", StringType)
+      .add("age", IntegerType))
+  }
+
+  test("ColumnSelection ExcludeColumns filters by exact name") {
+    val filteredSchema = ColumnSelection.applyToSchema(
+      sourceSchema,
+      Some(ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("id")))))
+
+    assert(filteredSchema == new StructType()
+      .add("Name", StringType)
+      .add("age", IntegerType))
+  }
+
+  test("ColumnSelection IncludeColumns fails for columns not present in 
schema") {
+    checkError(
+      exception = intercept[AnalysisException] {
+        ColumnSelection.applyToSchema(
+          sourceSchema,
+          // Column inclusion is case-sensitive; "name" will not match against 
"Name".
+          Some(ColumnSelection.IncludeColumns(
+            Seq(UnqualifiedColumnName("name"), 
UnqualifiedColumnName("missing"))))
+        )
+      },
+      condition = "AUTOCDC_INVALID_COLUMN_SELECTION.COLUMNS_NOT_FOUND",
+      sqlState = "42703",
+      parameters = Map(
+        "missingColumns" -> "name, missing",
+        "availableColumns" -> "id, Name, age"
+      )
+    )
+  }
+
+  test("ColumnSelection ExcludeColumns fails for columns not present in 
schema") {
+    checkError(
+      exception = intercept[AnalysisException] {
+        ColumnSelection.applyToSchema(
+          sourceSchema,
+          // Column exclusion is case-sensitive; "NAME" will not match against 
"Name".
+          Some(ColumnSelection.ExcludeColumns(
+            Seq(UnqualifiedColumnName("NAME"), 
UnqualifiedColumnName("missing"))))
+        )
+      },
+      condition = "AUTOCDC_INVALID_COLUMN_SELECTION.COLUMNS_NOT_FOUND",
+      sqlState = "42703",
+      parameters = Map(
+        "missingColumns" -> "NAME, missing",
+        "availableColumns" -> "id, Name, age"
+      )
+    )
+  }
+
+  test("UnqualifiedColumnName accepts a simple single-part identifier") {
+    assert(UnqualifiedColumnName("col").name == "col")
+  }
+
+  test("UnqualifiedColumnName accepts a backtick-quoted name containing a 
literal dot") {
+    // Backticks make the dot part of a single name part, so this passes 
validation.
+    assert(UnqualifiedColumnName("`a.b`").name == "`a.b`")

Review Comment:
   Suggestion: add a positive test exercising `ColumnSelection.applyToSchema` 
with `UnqualifiedColumnName("`a.b`")` against a `StructType` that contains a 
field literally named `a.b`. As written this would today raise 
`COLUMNS_NOT_FOUND` (see comment on `UnqualifiedColumnName`), which is probably 
not the intended behavior. A round-trip test would have caught it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to