cloud-fan commented on code in PR #55767:
URL: https://github.com/apache/spark/pull/55767#discussion_r3297563064


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala:
##########
@@ -345,6 +355,26 @@ case class PreprocessTableCreation(catalog: 
SessionCatalog) extends Rule[Logical
 
         create.withPartitioning(normalizedPartitions)
       }
+
+    case a @ AddColumns(ResolvedTable(_, _, t, _), cols) if a.resolved =>
+      checkColumns(
+        Option(t.properties().get(TableCatalog.PROP_PROVIDER)),
+        StructType(cols.map(c => StructField(c.colName, c.dataType, 
c.nullable))))

Review Comment:
   For nested `ADD COLUMNS (parent.child <unsupported>)`, `c.colName` is just 
`"child"`, so the `UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE` error reports 
``columnName -> `child` `` and loses the path context.
   
   ```suggestion
           StructType(cols.map(c => StructField(c.name.quoted, c.dataType, 
c.nullable))))
   ```
   (`ReplaceColumns` asserts `col.path.isEmpty`, so leaf == full name there — 
but applying the same form on line 368 keeps the two cases consistent.)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala:
##########
@@ -345,6 +355,26 @@ case class PreprocessTableCreation(catalog: 
SessionCatalog) extends Rule[Logical
 
         create.withPartitioning(normalizedPartitions)
       }
+
+    case a @ AddColumns(ResolvedTable(_, _, t, _), cols) if a.resolved =>
+      checkColumns(
+        Option(t.properties().get(TableCatalog.PROP_PROVIDER)),
+        StructType(cols.map(c => StructField(c.colName, c.dataType, 
c.nullable))))
+      a
+
+    case r @ ReplaceColumns(ResolvedTable(_, _, t, _), cols) if r.resolved =>
+      checkColumns(
+        Option(t.properties().get(TableCatalog.PROP_PROVIDER)),
+        StructType(cols.map(c => StructField(c.colName, c.dataType, 
c.nullable))))
+      r

Review Comment:
   Fix completeness — `ALTER TABLE ALTER COLUMN ... TYPE <unsupported>` can 
re-introduce the same broken-schema state. V1 is naturally protected 
(`canEvolveType` only permits compatible-collation evolution), but V2 
`AlterColumns` with `newDataType` flows to the connector via 
`TableChange.updateColumnType` with no schema check. Intentional out-of-scope 
for this PR (to keep the surface contained), or worth covering here? If 
out-of-scope, a `TODO` near the new cases would help.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala:
##########
@@ -345,6 +355,26 @@ case class PreprocessTableCreation(catalog: 
SessionCatalog) extends Rule[Logical
 
         create.withPartitioning(normalizedPartitions)
       }
+
+    case a @ AddColumns(ResolvedTable(_, _, t, _), cols) if a.resolved =>
+      checkColumns(
+        Option(t.properties().get(TableCatalog.PROP_PROVIDER)),
+        StructType(cols.map(c => StructField(c.colName, c.dataType, 
c.nullable))))
+      a
+
+    case r @ ReplaceColumns(ResolvedTable(_, _, t, _), cols) if r.resolved =>
+      checkColumns(
+        Option(t.properties().get(TableCatalog.PROP_PROVIDER)),
+        StructType(cols.map(c => StructField(c.colName, c.dataType, 
c.nullable))))
+      r
+  }
+
+  /** Checks the schema is supported by the destination [[FileFormat]]. */

Review Comment:
   ```suggestion
     /** Checks that the schema is supported by the destination [[FileFormat]]. 
*/
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala:
##########
@@ -345,6 +355,26 @@ case class PreprocessTableCreation(catalog: 
SessionCatalog) extends Rule[Logical
 
         create.withPartitioning(normalizedPartitions)
       }
+
+    case a @ AddColumns(ResolvedTable(_, _, t, _), cols) if a.resolved =>

Review Comment:
   The V2 sites trigger for any `ResolvedTable(_, _, t, _)`, but resolve the 
provider through Spark's built-in `DataSource.lookupDataSource`. That assumes 
`USING <p>` has the same meaning regardless of which V2 catalog the table lives 
in. For non-session V2 catalogs that interpret `USING csv` (or `USING parquet`) 
with their own rules — Iceberg, Delta, in-house connectors — this rejects 
schemas the catalog might support.
   
   V2 already has the right distinction for this: `MetadataTable` is the V2 
contract for "I'm a passthrough — interpret my `provider` using Spark's 
built-in data sources" (its class javadoc spells this out). Combined with 
`V1Table` (session catalog wrapping a V1 `CatalogTable`), those are the two 
table types where `USING csv` is *guaranteed* to mean `CSVFileFormat`.
   
   Suggest gating the check on table type, e.g.:
   ```scala
       case a @ AddColumns(ResolvedTable(_, _, t @ (_: V1Table | _: 
MetadataTable), _), cols)
           if a.resolved =>
         checkColumns(
           Option(t.properties().get(TableCatalog.PROP_PROVIDER)),
           StructType(cols.map(c => StructField(c.colName, c.dataType, 
c.nullable))))
         a
   ```
   Same shape for `ReplaceColumns` (line 365). The `V2CreateTablePlan` case 
(line 333) can't gate on table type (the table isn't created yet), so the 
analogous fix there is to gate on the catalog — e.g., only run `checkColumns` 
when the resolved catalog is the session catalog. Non-session V2 catalogs then 
own their own schema validation, which is what they want.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to