longvu-db commented on code in PR #56619:
URL: https://github.com/apache/spark/pull/56619#discussion_r3477684973


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala:
##########
@@ -967,7 +967,7 @@ private[v2] trait V2CreateTableAsSelectBaseExec extends 
LeafV2CommandExec {
   protected def getV2Columns(schema: StructType, forceNullable: Boolean): 
Array[Column] = {
     val rawSchema = 
CharVarcharUtils.getRawSchema(removeInternalMetadata(schema), conf)
     val tableSchema = if (forceNullable) rawSchema.asNullable else rawSchema
-    CatalogV2Util.structTypeToV2Columns(tableSchema)
+    CatalogV2Util.structTypeToV2Columns(tableSchema, keepIds = false)

Review Comment:
   ```suggestion
       CatalogV2Util.structTypeToV2Columns(tableSchema, keepIds)
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -1156,6 +1153,7 @@ class DataSourceV2DataFrameSuite
     }
   }
 
+

Review Comment:
   ```suggestion
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -28,10 +28,9 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, 
Row, SaveMode, SparkS
 import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, 
CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
-import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, 
Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue, 
GenerationExpression, Identifier, InMemoryTableCatalog, 
MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog, 
NullTableIdAndNullColumnIdInMemoryTableCatalog, 
NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, 
TableCatalog, TableInfo, TypeChangeResetsColIdTableCatalog}
-import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog
-import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, 
UpdateColumnDefaultValue}
+import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, 
CachingInMemoryTableCatalog, CatalogV2Util, Column, ColumnDefaultValue, 
DefaultValue, GenerationExpression, Identifier, InMemoryBaseTable, 
InMemoryTableCatalog, MixedColumnIdTableCatalog, 
NullColumnIdInMemoryTableCatalog, 
NullTableIdAndNullColumnIdInMemoryTableCatalog, 
NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, 
TableCatalog, TableInfo, TypeChangeResetsColIdTableCatalog}

Review Comment:
   Have we had a test where the Connector implements only the top-level Column 
ID but not the nested struct field ID? I assume 
`sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullTableIdInMemoryTableCatalog.scala`?



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -2905,9 +2825,15 @@ class DataSourceV2DataFrameSuite
       // reset-id catalog assigns a new ID for the widened column
       checkError(
         exception = intercept[AnalysisException] { df.collect() },
-        condition = 
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH",
+        condition = 
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH",
         matchPVals = true,
-        parameters = Map("tableName" -> ".*", "errors" -> ".*"))
+        parameters = Map(
+          "tableName" -> ".*",
+          "errors" ->
+            """|
+               |- `salary` field ID has changed from \d+ to \d+

Review Comment:
   "`salary` field ID has changed from \d+ to \d+"
   
   Does something look wrong here? i.e. the field ID didn't seem to change, and 
also it's a peculiar field ID name



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V2TableUtil.scala:
##########
@@ -57,22 +55,25 @@ private[sql] object V2TableUtil extends SQLConfHelper {
    * Validates that captured data columns match the current table schema.
    *
    * Checks for:
+   *  - Column ID changes (top-level and nested field IDs)
    *  - Column type or nullability changes
    *  - Removed columns (missing from the current table schema)
    *  - Added columns (new in the current table schema)
    *
    * @param table the current table metadata
    * @param originCols the originally captured columns
    * @param mode validation mode that defines what changes are acceptable
+   * @param checkIds whether to check field IDs
    * @return validation errors, or empty sequence if valid
    */
   def validateCapturedColumns(
       table: Table,
       originCols: Seq[Column],
-      mode: SchemaValidationMode = PROHIBIT_CHANGES): Seq[String] = {
+      mode: SchemaValidationMode = PROHIBIT_CHANGES,
+      checkIds: Boolean = true): Seq[String] = {

Review Comment:
   ```suggestion
         checkIds: Boolean): Seq[String] = {
   ```
   
   



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala:
##########
@@ -522,6 +536,38 @@ private[spark] object SchemaUtils {
       fields.map(field => field.name.toLowerCase(Locale.ROOT) -> field).toMap
     }
   }
+
+  /**
+   * Recursively clears field IDs from a data type.
+   */
+  def clearFieldIds(dataType: DataType): DataType = dataType match {

Review Comment:
   Nit: I wonder if we should call array elem, map key/value a "field" also



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V2TableUtil.scala:
##########
@@ -105,83 +106,26 @@ private[sql] object V2TableUtil extends SQLConfHelper {
    * Validates that captured metadata columns are consistent with the current 
table metadata.
    *
    * Checks for:
+   *  - Column ID changes (top-level and nested field IDs)
    *  - Metadata column type or nullability changes
    *  - Removed metadata columns (missing from current table)
    *
    * @param table the current table metadata
    * @param originMetaCols the originally captured metadata columns
    * @param mode validation mode that defines what changes are acceptable
+   * @param checkIds whether to check IDs
    * @return validation errors, or empty sequence if valid
    */
   def validateCapturedMetadataColumns(
       table: Table,
       originMetaCols: Seq[MetadataColumn],
-      mode: SchemaValidationMode = PROHIBIT_CHANGES): Seq[String] = {
+      mode: SchemaValidationMode = PROHIBIT_CHANGES,
+      checkIds: Boolean = true): Seq[String] = {

Review Comment:
   ```suggestion
         checkIds: Boolean): Seq[String] = {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to