longvu-db commented on code in PR #56619:
URL: https://github.com/apache/spark/pull/56619#discussion_r3477684973
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala:
##########
@@ -967,7 +967,7 @@ private[v2] trait V2CreateTableAsSelectBaseExec extends
LeafV2CommandExec {
protected def getV2Columns(schema: StructType, forceNullable: Boolean):
Array[Column] = {
val rawSchema =
CharVarcharUtils.getRawSchema(removeInternalMetadata(schema), conf)
val tableSchema = if (forceNullable) rawSchema.asNullable else rawSchema
- CatalogV2Util.structTypeToV2Columns(tableSchema)
+ CatalogV2Util.structTypeToV2Columns(tableSchema, keepIds = false)
Review Comment:
```suggestion
CatalogV2Util.structTypeToV2Columns(tableSchema, keepIds)
```
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -1156,6 +1153,7 @@ class DataSourceV2DataFrameSuite
}
}
+
Review Comment:
```suggestion
```
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -28,10 +28,9 @@ import org.apache.spark.sql.{AnalysisException, DataFrame,
Row, SaveMode, SparkS
import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.plans.logical.{AppendData,
CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
-import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog,
Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue,
GenerationExpression, Identifier, InMemoryTableCatalog,
MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog,
NullTableIdAndNullColumnIdInMemoryTableCatalog,
NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable,
TableCatalog, TableInfo, TypeChangeResetsColIdTableCatalog}
-import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog
-import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn,
UpdateColumnDefaultValue}
+import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog,
CachingInMemoryTableCatalog, CatalogV2Util, Column, ColumnDefaultValue,
DefaultValue, GenerationExpression, Identifier, InMemoryBaseTable,
InMemoryTableCatalog, MixedColumnIdTableCatalog,
NullColumnIdInMemoryTableCatalog,
NullTableIdAndNullColumnIdInMemoryTableCatalog,
NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable,
TableCatalog, TableInfo, TypeChangeResetsColIdTableCatalog}
Review Comment:
Have we had a test where the Connector implements only the top-level Column
ID but not the nested struct field ID? I assume
`sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullTableIdInMemoryTableCatalog.scala`?
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -2905,9 +2825,15 @@ class DataSourceV2DataFrameSuite
// reset-id catalog assigns a new ID for the widened column
checkError(
exception = intercept[AnalysisException] { df.collect() },
- condition =
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCH",
+ condition =
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH",
matchPVals = true,
- parameters = Map("tableName" -> ".*", "errors" -> ".*"))
+ parameters = Map(
+ "tableName" -> ".*",
+ "errors" ->
+ """|
+ |- `salary` field ID has changed from \d+ to \d+
Review Comment:
"`salary` field ID has changed from \d+ to \d+"
Does something look wrong here? i.e. the field ID didn't seem to change, and
also it's a peculiar field ID name
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V2TableUtil.scala:
##########
@@ -57,22 +55,25 @@ private[sql] object V2TableUtil extends SQLConfHelper {
* Validates that captured data columns match the current table schema.
*
* Checks for:
+ * - Column ID changes (top-level and nested field IDs)
* - Column type or nullability changes
* - Removed columns (missing from the current table schema)
* - Added columns (new in the current table schema)
*
* @param table the current table metadata
* @param originCols the originally captured columns
* @param mode validation mode that defines what changes are acceptable
+ * @param checkIds whether to check field IDs
* @return validation errors, or empty sequence if valid
*/
def validateCapturedColumns(
table: Table,
originCols: Seq[Column],
- mode: SchemaValidationMode = PROHIBIT_CHANGES): Seq[String] = {
+ mode: SchemaValidationMode = PROHIBIT_CHANGES,
+ checkIds: Boolean = true): Seq[String] = {
Review Comment:
```suggestion
checkIds: Boolean): Seq[String] = {
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala:
##########
@@ -522,6 +536,38 @@ private[spark] object SchemaUtils {
fields.map(field => field.name.toLowerCase(Locale.ROOT) -> field).toMap
}
}
+
+ /**
+ * Recursively clears field IDs from a data type.
+ */
+ def clearFieldIds(dataType: DataType): DataType = dataType match {
Review Comment:
Nit: I wonder if we should call array elem, map key/value a "field" also
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V2TableUtil.scala:
##########
@@ -105,83 +106,26 @@ private[sql] object V2TableUtil extends SQLConfHelper {
* Validates that captured metadata columns are consistent with the current
table metadata.
*
* Checks for:
+ * - Column ID changes (top-level and nested field IDs)
* - Metadata column type or nullability changes
* - Removed metadata columns (missing from current table)
*
* @param table the current table metadata
* @param originMetaCols the originally captured metadata columns
* @param mode validation mode that defines what changes are acceptable
+ * @param checkIds whether to check IDs
* @return validation errors, or empty sequence if valid
*/
def validateCapturedMetadataColumns(
table: Table,
originMetaCols: Seq[MetadataColumn],
- mode: SchemaValidationMode = PROHIBIT_CHANGES): Seq[String] = {
+ mode: SchemaValidationMode = PROHIBIT_CHANGES,
+ checkIds: Boolean = true): Seq[String] = {
Review Comment:
```suggestion
checkIds: Boolean): Seq[String] = {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]