szehon-ho commented on code in PR #55836:
URL: https://github.com/apache/spark/pull/55836#discussion_r3268465492


##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.sql.{AnalysisException, Column}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.util.QuotingUtils
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A single, unqualified column identifier (no nested path or table/alias 
qualifier). Backticks
+ * are consumed: "`a.b`" is stored as "a.b" in [[name]]. Use [[name]] for 
direct schema-fieldName
+ * comparison and [[quoted]] for APIs that re-parse identifier strings.
+ */
+case class UnqualifiedColumnName private (name: String) {
+  def quoted: String = QuotingUtils.quoteIdentifier(name)
+}
+
+object UnqualifiedColumnName {
+  def apply(input: String): UnqualifiedColumnName = {
+    val nameParts = CatalystSqlParser.parseMultipartIdentifier(input)
+    if (nameParts.length != 1) {
+      throw multipartColumnIdentifierError(input, nameParts)
+    }
+    new UnqualifiedColumnName(nameParts.head)
+  }
+
+  private def multipartColumnIdentifierError(
+      columnName: String,
+      nameParts: Seq[String]
+  ): AnalysisException =
+    new AnalysisException(
+      errorClass = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER",
+      messageParameters = Map(
+        "columnName" -> columnName,
+        "nameParts" -> nameParts.mkString(", ")
+      )
+    )
+}
+
+sealed trait ColumnSelection
+object ColumnSelection {
+
+  case class IncludeColumns(columns: Seq[UnqualifiedColumnName]) extends 
ColumnSelection
+  case class ExcludeColumns(columns: Seq[UnqualifiedColumnName])
+      extends ColumnSelection
+
+  /**
+   * Applies [[ColumnSelection]] to a [[StructType]] and returns the filtered 
schema. Field
+   * order follows the original schema; filtering happens in place.

Review Comment:
   Nit: "filtering happens in place" is a bit misleading — this returns a new 
`StructType` and does not mutate `schema`. Suggest something like:
   
   "Field order follows the original schema; only matching fields are retained 
in the returned schema."



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.sql.{AnalysisException, Column}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.util.QuotingUtils
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A single, unqualified column identifier (no nested path or table/alias 
qualifier). Backticks
+ * are consumed: "`a.b`" is stored as "a.b" in [[name]]. Use [[name]] for 
direct schema-fieldName
+ * comparison and [[quoted]] for APIs that re-parse identifier strings.
+ */
+case class UnqualifiedColumnName private (name: String) {
+  def quoted: String = QuotingUtils.quoteIdentifier(name)
+}
+
+object UnqualifiedColumnName {
+  def apply(input: String): UnqualifiedColumnName = {
+    val nameParts = CatalystSqlParser.parseMultipartIdentifier(input)
+    if (nameParts.length != 1) {
+      throw multipartColumnIdentifierError(input, nameParts)
+    }
+    new UnqualifiedColumnName(nameParts.head)
+  }
+
+  private def multipartColumnIdentifierError(
+      columnName: String,
+      nameParts: Seq[String]
+  ): AnalysisException =
+    new AnalysisException(
+      errorClass = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER",
+      messageParameters = Map(
+        "columnName" -> columnName,
+        "nameParts" -> nameParts.mkString(", ")
+      )
+    )
+}
+
+sealed trait ColumnSelection
+object ColumnSelection {
+
+  case class IncludeColumns(columns: Seq[UnqualifiedColumnName]) extends 
ColumnSelection
+  case class ExcludeColumns(columns: Seq[UnqualifiedColumnName])
+      extends ColumnSelection
+
+  /**
+   * Applies [[ColumnSelection]] to a [[StructType]] and returns the filtered 
schema. Field
+   * order follows the original schema; filtering happens in place.
+   */
+  def applyToSchema(
+      schemaName: String,
+      schema: StructType,
+      columnSelection: Option[ColumnSelection],
+      ignoreCase: Boolean): StructType = columnSelection match {

Review Comment:
   Could you add a `@param ignoreCase` here? Callers should pass session case 
sensitivity, e.g. `!session.sessionState.conf.caseSensitiveAnalysis`, so column 
matching stays consistent with `spark.sql.caseSensitive` (same pattern as 
`StaxXmlParser` using `getFieldIndex` vs `getFieldIndexCaseInsensitive`).
   
   Optional follow-up: consider renaming to `caseSensitive` (and inverting the 
branch) to match `SQLConf.caseSensitiveAnalysis` / catalyst naming — not 
blocking.



##########
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala:
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.{functions => F, AnalysisException, Row}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
+
+class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession {
+
+  private val sourceSchema = new StructType()
+    .add("id", IntegerType, nullable = false)
+    .add("Name", StringType)
+    .add("age", IntegerType)
+
+  test("ColumnSelection None leaves schema unchanged") {
+    assert(
+      ColumnSelection.applyToSchema(
+        schemaName = "test",
+        schema = sourceSchema,
+        columnSelection = None,
+        ignoreCase = false
+      ) == sourceSchema)
+  }
+
+  test("ColumnSelection IncludeColumns(Seq()) returns an empty schema") {
+    // An explicit empty include-list is semantically distinct from None: it 
means "select
+    // no columns" and produces an empty StructType, not the original schema.
+    assert(
+      ColumnSelection.applyToSchema(
+        schemaName = "test",
+        schema = sourceSchema,
+        columnSelection = Some(ColumnSelection.IncludeColumns(Seq.empty)),
+        ignoreCase = false
+      ) == new StructType())
+  }
+
+  test("ColumnSelection ExcludeColumns(Seq()) leaves schema unchanged") {
+    // An empty exclude-list is a no-op: nothing to remove, so the original 
schema is
+    // returned unchanged (same observable behavior as None for this case).
+    assert(
+      ColumnSelection.applyToSchema(
+        schemaName = "test",
+        schema = sourceSchema,
+        columnSelection = Some(ColumnSelection.ExcludeColumns(Seq.empty)),
+        ignoreCase = false
+      ) == sourceSchema)
+  }
+
+  test("ColumnSelection IncludeColumns filters by exact name in schema order") 
{
+    val filteredSchema = ColumnSelection.applyToSchema(
+      schemaName = "test",
+      schema = sourceSchema,
+      columnSelection = Some(
+        ColumnSelection.IncludeColumns(
+          Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("Name"))
+        )
+      ),
+      ignoreCase = false
+    )
+
+    assert(filteredSchema == new StructType()
+      .add("Name", StringType)
+      .add("age", IntegerType))
+  }
+
+  test("ColumnSelection ExcludeColumns filters by exact name") {
+    val filteredSchema = ColumnSelection.applyToSchema(
+      schemaName = "test",
+      schema = sourceSchema,
+      columnSelection = Some(
+        ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("id")))
+      ),
+      ignoreCase = false
+    )
+
+    assert(filteredSchema == new StructType()
+      .add("Name", StringType)
+      .add("age", IntegerType))
+  }
+
+  test("ColumnSelection IncludeColumns fails for columns not present in 
schema") {
+    checkError(
+      exception = intercept[AnalysisException] {
+        ColumnSelection.applyToSchema(
+          schemaName = "test",
+          schema = sourceSchema,
+          // Under ignoreCase = false, "name" will not match the schema field 
"Name".
+          columnSelection = Some(
+            ColumnSelection.IncludeColumns(
+              Seq(UnqualifiedColumnName("name"), 
UnqualifiedColumnName("missing"))
+            )
+          ),
+          ignoreCase = false
+        )
+      },
+      condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA",
+      sqlState = "42703",
+      parameters = Map(
+        "caseSensitivity" -> CaseSensitivityLabels.CaseSensitive,
+        "schemaName" -> "test",
+        "missingColumns" -> "name, missing",
+        "availableColumns" -> "id, Name, age"
+      )
+    )
+  }
+
+  test("ColumnSelection ExcludeColumns fails for columns not present in 
schema") {
+    checkError(
+      exception = intercept[AnalysisException] {
+        ColumnSelection.applyToSchema(
+          schemaName = "test",
+          schema = sourceSchema,
+          // Under ignoreCase = false, "NAME" will not match the schema field 
"Name".
+          columnSelection = Some(
+            ColumnSelection.ExcludeColumns(
+              Seq(UnqualifiedColumnName("NAME"), 
UnqualifiedColumnName("missing"))
+            )
+          ),
+          ignoreCase = false
+        )
+      },
+      condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA",
+      sqlState = "42703",
+      parameters = Map(
+        "caseSensitivity" -> CaseSensitivityLabels.CaseSensitive,
+        "schemaName" -> "test",
+        "missingColumns" -> "NAME, missing",
+        "availableColumns" -> "id, Name, age"
+      )
+    )
+  }
+
+  test("ColumnSelection IncludeColumns matches case-insensitively under 
ignoreCase=true") {
+    // "NAME" and "AGE" do not exactly match the schema fields "Name" and 
"age", but
+    // ignoreCase = true folds both sides to lowercase before comparing.
+    val filteredSchema = ColumnSelection.applyToSchema(
+      schemaName = "test",
+      schema = sourceSchema,
+      columnSelection = Some(
+        ColumnSelection.IncludeColumns(
+          Seq(UnqualifiedColumnName("AGE"), UnqualifiedColumnName("NAME"))
+        )
+      ),
+      ignoreCase = true
+    )
+
+    // The retained fields keep their original casing from the schema, not the 
user's input.
+    assert(filteredSchema == new StructType()
+      .add("Name", StringType)
+      .add("age", IntegerType))
+  }
+
+  test("ColumnSelection deduplicates user-provided columns that normalize to 
the same name") {
+    // Under ignoreCase = true, "name" and "NAME" both fold to "name" and 
refer to the same
+    // schema field. The returned schema must include "Name" once, not twice. 
Output ordering
+    // and casing follow the schema, not the user's input.
+    val filteredSchema = ColumnSelection.applyToSchema(
+      schemaName = "test",
+      schema = sourceSchema,
+      columnSelection = Some(
+        ColumnSelection.IncludeColumns(
+          Seq(UnqualifiedColumnName("name"), UnqualifiedColumnName("NAME"))
+        )
+      ),
+      ignoreCase = true
+    )
+
+    assert(filteredSchema == new StructType().add("Name", StringType))
+  }
+
+  test("ColumnSelection ExcludeColumns matches case-insensitively under 
ignoreCase=true") {
+    val filteredSchema = ColumnSelection.applyToSchema(
+      schemaName = "test",
+      schema = sourceSchema,
+      columnSelection = Some(
+        ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("name")))
+      ),
+      ignoreCase = true
+    )
+
+    assert(filteredSchema == new StructType()
+      .add("id", IntegerType, nullable = false)
+      .add("age", IntegerType))
+  }
+
+  test("ColumnSelection missing-column error under ignoreCase=true preserves 
user casing") {
+    checkError(
+      exception = intercept[AnalysisException] {
+        ColumnSelection.applyToSchema(
+          schemaName = "test",
+          schema = sourceSchema,
+          // "NAME" matches "Name" under ignoreCase=true, but "Missing" has no 
schema match.
+          // The error message reports the user's original casing for the 
missing column and
+          // the schema's original casing for the available columns.
+          columnSelection = Some(
+            ColumnSelection.IncludeColumns(
+              Seq(UnqualifiedColumnName("NAME"), 
UnqualifiedColumnName("Missing"))
+            )
+          ),
+          ignoreCase = true
+        )
+      },
+      condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA",
+      sqlState = "42703",
+      parameters = Map(
+        "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
+        "schemaName" -> "test",
+        "missingColumns" -> "Missing",
+        "availableColumns" -> "id, Name, age"
+      )
+    )
+  }
+
+  test("UnqualifiedColumnName accepts a simple single-part identifier") {
+    assert(UnqualifiedColumnName("col").name == "col")
+    // .quoted always wraps in back-ticks, even when the input had none.
+    assert(UnqualifiedColumnName("col").quoted == "`col`")
+  }
+
+  test("UnqualifiedColumnName accepts a backtick-quoted name containing a 
literal dot") {
+    // Backticks make the dot part of a single name part, so this passes 
validation. The
+    // stored name is the parsed (unquoted) form so it matches the actual 
schema field name.
+    assert(UnqualifiedColumnName("`a.b`").name == "a.b")
+    // .quoted re-wraps the parsed name in back-ticks, round-tripping back to 
the input form.
+    assert(UnqualifiedColumnName("`a.b`").quoted == "`a.b`")
+  }
+
+  test("UnqualifiedColumnName accepts redundant backticks around a single-part 
name") {
+    // Backticks around an already-single-part identifier are decorative; the 
parser strips them
+    // so the stored name has no surrounding back-ticks.
+    assert(UnqualifiedColumnName("`col`").name == "col")
+    // .quoted re-wraps the parsed name in back-ticks, round-tripping back to 
the input form.
+    assert(UnqualifiedColumnName("`col`").quoted == "`col`")
+  }

Review Comment:
   We have `UnqualifiedColumnName("`col`")` and case-insensitive 
`IncludeColumns` tests, but not the combination: backtick-quoted input against 
a mixed-case schema field via `applyToSchema`.
   
   Suggest adding something like:
   
   ```scala
   test("ColumnSelection IncludeColumns accepts backtick-quoted mixed-case 
column") {
     val filtered = ColumnSelection.applyToSchema(
       schemaName = "test",
       schema = sourceSchema, // has field "Name"
       columnSelection = Some(
         ColumnSelection.IncludeColumns(Seq(UnqualifiedColumnName("`Name`")))),
       ignoreCase = false)
     assert(filtered == new StructType().add("Name", StringType))
   }
   ```
   
   This exercises the main lookup path, not just `UnqualifiedColumnName.apply`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to