szehon-ho commented on code in PR #55836: URL: https://github.com/apache/spark/pull/55836#discussion_r3268465492
########## sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala: ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.sql.{AnalysisException, Column} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.util.QuotingUtils +import org.apache.spark.sql.types.StructType + +/** + * A single, unqualified column identifier (no nested path or table/alias qualifier). Backticks + * are consumed: "`a.b`" is stored as "a.b" in [[name]]. Use [[name]] for direct schema-fieldName + * comparison and [[quoted]] for APIs that re-parse identifier strings. + */ +case class UnqualifiedColumnName private (name: String) { + def quoted: String = QuotingUtils.quoteIdentifier(name) +} + +object UnqualifiedColumnName { + def apply(input: String): UnqualifiedColumnName = { + val nameParts = CatalystSqlParser.parseMultipartIdentifier(input) + if (nameParts.length != 1) { + throw multipartColumnIdentifierError(input, nameParts) + } + new UnqualifiedColumnName(nameParts.head) + } + + private def multipartColumnIdentifierError( + columnName: String, + nameParts: Seq[String] + ): AnalysisException = + new AnalysisException( + errorClass = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER", + messageParameters = Map( + "columnName" -> columnName, + "nameParts" -> nameParts.mkString(", ") + ) + ) +} + +sealed trait ColumnSelection +object ColumnSelection { + + case class IncludeColumns(columns: Seq[UnqualifiedColumnName]) extends ColumnSelection + case class ExcludeColumns(columns: Seq[UnqualifiedColumnName]) + extends ColumnSelection + + /** + * Applies [[ColumnSelection]] to a [[StructType]] and returns the filtered schema. Field + * order follows the original schema; filtering happens in place. Review Comment: Nit: "filtering happens in place" is a bit misleading — this returns a new `StructType` and does not mutate `schema`. Suggest something like: "Field order follows the original schema; only matching fields are retained in the returned schema." ########## sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala: ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.sql.{AnalysisException, Column} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.util.QuotingUtils +import org.apache.spark.sql.types.StructType + +/** + * A single, unqualified column identifier (no nested path or table/alias qualifier). Backticks + * are consumed: "`a.b`" is stored as "a.b" in [[name]]. Use [[name]] for direct schema-fieldName + * comparison and [[quoted]] for APIs that re-parse identifier strings. + */ +case class UnqualifiedColumnName private (name: String) { + def quoted: String = QuotingUtils.quoteIdentifier(name) +} + +object UnqualifiedColumnName { + def apply(input: String): UnqualifiedColumnName = { + val nameParts = CatalystSqlParser.parseMultipartIdentifier(input) + if (nameParts.length != 1) { + throw multipartColumnIdentifierError(input, nameParts) + } + new UnqualifiedColumnName(nameParts.head) + } + + private def multipartColumnIdentifierError( + columnName: String, + nameParts: Seq[String] + ): AnalysisException = + new AnalysisException( + errorClass = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER", + messageParameters = Map( + "columnName" -> columnName, + "nameParts" -> nameParts.mkString(", ") + ) + ) +} + +sealed trait ColumnSelection +object ColumnSelection { + + case class IncludeColumns(columns: Seq[UnqualifiedColumnName]) extends ColumnSelection + case class ExcludeColumns(columns: Seq[UnqualifiedColumnName]) + extends ColumnSelection + + /** + * Applies [[ColumnSelection]] to a [[StructType]] and returns the filtered schema. Field + * order follows the original schema; filtering happens in place. + */ + def applyToSchema( + schemaName: String, + schema: StructType, + columnSelection: Option[ColumnSelection], + ignoreCase: Boolean): StructType = columnSelection match { Review Comment: Could you add a `@param ignoreCase` here? Callers should pass session case sensitivity, e.g. `!session.sessionState.conf.caseSensitiveAnalysis`, so column matching stays consistent with `spark.sql.caseSensitive` (same pattern as `StaxXmlParser` using `getFieldIndex` vs `getFieldIndexCaseInsensitive`). Optional follow-up: consider renaming to `caseSensitive` (and inverting the branch) to match `SQLConf.caseSensitiveAnalysis` / catalyst naming — not blocking. ########## sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala: ########## @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{functions => F, AnalysisException, Row} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} + +class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { + + private val sourceSchema = new StructType() + .add("id", IntegerType, nullable = false) + .add("Name", StringType) + .add("age", IntegerType) + + test("ColumnSelection None leaves schema unchanged") { + assert( + ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = None, + ignoreCase = false + ) == sourceSchema) + } + + test("ColumnSelection IncludeColumns(Seq()) returns an empty schema") { + // An explicit empty include-list is semantically distinct from None: it means "select + // no columns" and produces an empty StructType, not the original schema. + assert( + ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some(ColumnSelection.IncludeColumns(Seq.empty)), + ignoreCase = false + ) == new StructType()) + } + + test("ColumnSelection ExcludeColumns(Seq()) leaves schema unchanged") { + // An empty exclude-list is a no-op: nothing to remove, so the original schema is + // returned unchanged (same observable behavior as None for this case). + assert( + ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some(ColumnSelection.ExcludeColumns(Seq.empty)), + ignoreCase = false + ) == sourceSchema) + } + + test("ColumnSelection IncludeColumns filters by exact name in schema order") { + val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("Name")) + ) + ), + ignoreCase = false + ) + + assert(filteredSchema == new StructType() + .add("Name", StringType) + .add("age", IntegerType)) + } + + test("ColumnSelection ExcludeColumns filters by exact name") { + val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some( + ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("id"))) + ), + ignoreCase = false + ) + + assert(filteredSchema == new StructType() + .add("Name", StringType) + .add("age", IntegerType)) + } + + test("ColumnSelection IncludeColumns fails for columns not present in schema") { + checkError( + exception = intercept[AnalysisException] { + ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + // Under ignoreCase = false, "name" will not match the schema field "Name". + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("name"), UnqualifiedColumnName("missing")) + ) + ), + ignoreCase = false + ) + }, + condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", + sqlState = "42703", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseSensitive, + "schemaName" -> "test", + "missingColumns" -> "name, missing", + "availableColumns" -> "id, Name, age" + ) + ) + } + + test("ColumnSelection ExcludeColumns fails for columns not present in schema") { + checkError( + exception = intercept[AnalysisException] { + ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + // Under ignoreCase = false, "NAME" will not match the schema field "Name". + columnSelection = Some( + ColumnSelection.ExcludeColumns( + Seq(UnqualifiedColumnName("NAME"), UnqualifiedColumnName("missing")) + ) + ), + ignoreCase = false + ) + }, + condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", + sqlState = "42703", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseSensitive, + "schemaName" -> "test", + "missingColumns" -> "NAME, missing", + "availableColumns" -> "id, Name, age" + ) + ) + } + + test("ColumnSelection IncludeColumns matches case-insensitively under ignoreCase=true") { + // "NAME" and "AGE" do not exactly match the schema fields "Name" and "age", but + // ignoreCase = true folds both sides to lowercase before comparing. + val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("AGE"), UnqualifiedColumnName("NAME")) + ) + ), + ignoreCase = true + ) + + // The retained fields keep their original casing from the schema, not the user's input. + assert(filteredSchema == new StructType() + .add("Name", StringType) + .add("age", IntegerType)) + } + + test("ColumnSelection deduplicates user-provided columns that normalize to the same name") { + // Under ignoreCase = true, "name" and "NAME" both fold to "name" and refer to the same + // schema field. The returned schema must include "Name" once, not twice. Output ordering + // and casing follow the schema, not the user's input. + val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("name"), UnqualifiedColumnName("NAME")) + ) + ), + ignoreCase = true + ) + + assert(filteredSchema == new StructType().add("Name", StringType)) + } + + test("ColumnSelection ExcludeColumns matches case-insensitively under ignoreCase=true") { + val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some( + ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("name"))) + ), + ignoreCase = true + ) + + assert(filteredSchema == new StructType() + .add("id", IntegerType, nullable = false) + .add("age", IntegerType)) + } + + test("ColumnSelection missing-column error under ignoreCase=true preserves user casing") { + checkError( + exception = intercept[AnalysisException] { + ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + // "NAME" matches "Name" under ignoreCase=true, but "Missing" has no schema match. + // The error message reports the user's original casing for the missing column and + // the schema's original casing for the available columns. + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("NAME"), UnqualifiedColumnName("Missing")) + ) + ), + ignoreCase = true + ) + }, + condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", + sqlState = "42703", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive, + "schemaName" -> "test", + "missingColumns" -> "Missing", + "availableColumns" -> "id, Name, age" + ) + ) + } + + test("UnqualifiedColumnName accepts a simple single-part identifier") { + assert(UnqualifiedColumnName("col").name == "col") + // .quoted always wraps in back-ticks, even when the input had none. + assert(UnqualifiedColumnName("col").quoted == "`col`") + } + + test("UnqualifiedColumnName accepts a backtick-quoted name containing a literal dot") { + // Backticks make the dot part of a single name part, so this passes validation. The + // stored name is the parsed (unquoted) form so it matches the actual schema field name. + assert(UnqualifiedColumnName("`a.b`").name == "a.b") + // .quoted re-wraps the parsed name in back-ticks, round-tripping back to the input form. + assert(UnqualifiedColumnName("`a.b`").quoted == "`a.b`") + } + + test("UnqualifiedColumnName accepts redundant backticks around a single-part name") { + // Backticks around an already-single-part identifier are decorative; the parser strips them + // so the stored name has no surrounding back-ticks. + assert(UnqualifiedColumnName("`col`").name == "col") + // .quoted re-wraps the parsed name in back-ticks, round-tripping back to the input form. + assert(UnqualifiedColumnName("`col`").quoted == "`col`") + } Review Comment: We have `UnqualifiedColumnName("`col`")` and case-insensitive `IncludeColumns` tests, but not the combination: backtick-quoted input against a mixed-case schema field via `applyToSchema`. Suggest adding something like: ```scala test("ColumnSelection IncludeColumns accepts backtick-quoted mixed-case column") { val filtered = ColumnSelection.applyToSchema( schemaName = "test", schema = sourceSchema, // has field "Name" columnSelection = Some( ColumnSelection.IncludeColumns(Seq(UnqualifiedColumnName("`Name`")))), ignoreCase = false) assert(filtered == new StructType().add("Name", StringType)) } ``` This exercises the main lookup path, not just `UnqualifiedColumnName.apply`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
