longvu-db commented on code in PR #56619:
URL: https://github.com/apache/spark/pull/56619#discussion_r3474657759


##########
sql/api/src/main/scala/org/apache/spark/sql/types/StructField.scala:
##########
@@ -243,6 +244,43 @@ case class StructField(
     metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY)
   }
 
+  /**
+   * Updates the field with an ID for column identity tracking.
+   */
+  def withId(id: String): StructField = {
+    val newMetadata = new MetadataBuilder()
+      .withMetadata(metadata)
+      .putString(FIELD_ID_METADATA_KEY, id)
+      .build()
+    copy(metadata = newMetadata)
+  }
+
+  /**
+   * Returns the ID of this field, if set.
+   */
+  def id: Option[String] = {
+    if (metadata.contains(FIELD_ID_METADATA_KEY)) {
+      Some(metadata.getString(FIELD_ID_METADATA_KEY))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Returns a copy of this field with the field ID removed, or this field if 
no ID is set.
+   */
+  def clearId(): StructField = {

Review Comment:
   1. Something tells me we should either return a copy in both scenarios, or 
in-place modification in both scenarios, for consistency.
   
   2. P/s: if someone calls `field.clearId()`, it feels more intuitive as 
in-place modification for me, but I'm okay with this reutrning a copy
   
   3. Also if someone does
   a. `fieldCopy = field.clearId() (where no ID is set)
   b. `modification_to_the_fieldCopy`
   
   Since currently, we return the field itself, would 
`modification_to_the_fieldCopy` modifies both the `fieldCopy` and `field`, sort 
of like indirect modification?



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java:
##########
@@ -243,12 +227,103 @@ default GenerationExpression 
columnGenerationExpression() {
    * others.
    * <p>
    * This API covers top-level columns only. Nested struct fields, array 
elements, and map
-   * keys/values do not have separate IDs. Connectors that track nested field 
IDs can encode
-   * them into the returned top-level Column ID string to detect nested 
changes, since Spark
-   * only compares string equality.
+   * keys/values carry their own IDs in struct field metadata. Spark validates 
both top-level and
+   * nested field IDs as part of schema compatibility checks. See {@link 
StructField#id()}.

Review Comment:
   ```suggestion
      * nested struct field IDs as part of schema compatibility checks (array 
elements and map/key values' validation is not supported yet). See {@link 
StructField#id()}.
   ```
   
   



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java:
##########
@@ -243,12 +227,103 @@ default GenerationExpression 
columnGenerationExpression() {
    * others.
    * <p>
    * This API covers top-level columns only. Nested struct fields, array 
elements, and map
-   * keys/values do not have separate IDs. Connectors that track nested field 
IDs can encode
-   * them into the returned top-level Column ID string to detect nested 
changes, since Spark
-   * only compares string equality.
+   * keys/values carry their own IDs in struct field metadata. Spark validates 
both top-level and
+   * nested field IDs as part of schema compatibility checks. See {@link 
StructField#id()}.
    */
   @Nullable
   default String id() {
     return null;
   }
+
+  /**
+   * A builder for {@link Column}.
+   *
+   * @since 4.2.0
+   */
+  class Builder {
+    private final String name;
+    private DataType dataType;
+    private boolean nullable = true;
+    private String comment = null;
+    private ColumnDefaultValue defaultValue = null;
+    private GenerationExpression genExpr = null;
+    private IdentityColumnSpec identityColumnSpec = null;
+    private String metadataInJSON = null;
+    private String id = null;
+
+    private Builder(String name, DataType dataType) {
+      this.name = Objects.requireNonNull(name, "name must not be null");
+      this.dataType = Objects.requireNonNull(dataType, "dataType must not be 
null");
+    }
+
+    public Builder nullable(boolean nullable) {
+      this.nullable = nullable;
+      return this;
+    }
+
+    public Builder comment(String comment) {
+      this.comment = comment;
+      return this;
+    }
+
+    public Builder defaultValue(ColumnDefaultValue defaultValue) {
+      this.defaultValue = defaultValue;
+      return this;
+    }
+
+    public Builder generationExpression(String sql) {
+      this.genExpr = sql != null ? new GenerationExpression(sql) : null;
+      return this;
+    }
+
+    public Builder generationExpression(GenerationExpression generationExpr) {
+      this.genExpr = generationExpr;
+      return this;
+    }
+
+    public Builder identityColumnSpec(IdentityColumnSpec identityColumnSpec) {
+      this.identityColumnSpec = identityColumnSpec;
+      return this;
+    }
+
+    public Builder metadata(String metadataInJSON) {
+      this.metadataInJSON = metadataInJSON;
+      return this;
+    }
+
+    public Builder id(String id) {
+      this.id = id;
+      return this;
+    }
+
+    public Builder clearFieldIds() {
+      this.id = null;

Review Comment:
   Is it a problem if the `clearId` above returns a copy, but `clearFieldIds` 
here doesn't?



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java:
##########
@@ -243,12 +227,103 @@ default GenerationExpression 
columnGenerationExpression() {
    * others.
    * <p>
    * This API covers top-level columns only. Nested struct fields, array 
elements, and map
-   * keys/values do not have separate IDs. Connectors that track nested field 
IDs can encode
-   * them into the returned top-level Column ID string to detect nested 
changes, since Spark
-   * only compares string equality.
+   * keys/values carry their own IDs in struct field metadata. Spark validates 
both top-level and
+   * nested field IDs as part of schema compatibility checks. See {@link 
StructField#id()}.
    */
   @Nullable
   default String id() {
     return null;
   }
+
+  /**
+   * A builder for {@link Column}.
+   *
+   * @since 4.2.0
+   */
+  class Builder {
+    private final String name;
+    private DataType dataType;
+    private boolean nullable = true;
+    private String comment = null;
+    private ColumnDefaultValue defaultValue = null;
+    private GenerationExpression genExpr = null;
+    private IdentityColumnSpec identityColumnSpec = null;
+    private String metadataInJSON = null;
+    private String id = null;
+
+    private Builder(String name, DataType dataType) {
+      this.name = Objects.requireNonNull(name, "name must not be null");
+      this.dataType = Objects.requireNonNull(dataType, "dataType must not be 
null");
+    }
+
+    public Builder nullable(boolean nullable) {
+      this.nullable = nullable;
+      return this;
+    }
+
+    public Builder comment(String comment) {
+      this.comment = comment;
+      return this;
+    }
+
+    public Builder defaultValue(ColumnDefaultValue defaultValue) {
+      this.defaultValue = defaultValue;
+      return this;
+    }
+
+    public Builder generationExpression(String sql) {
+      this.genExpr = sql != null ? new GenerationExpression(sql) : null;
+      return this;
+    }
+
+    public Builder generationExpression(GenerationExpression generationExpr) {
+      this.genExpr = generationExpr;
+      return this;
+    }
+
+    public Builder identityColumnSpec(IdentityColumnSpec identityColumnSpec) {
+      this.identityColumnSpec = identityColumnSpec;
+      return this;
+    }
+
+    public Builder metadata(String metadataInJSON) {
+      this.metadataInJSON = metadataInJSON;
+      return this;
+    }
+
+    public Builder id(String id) {
+      this.id = id;
+      return this;
+    }
+
+    public Builder clearFieldIds() {

Review Comment:
   `clearFieldIds` -> Should we rename it to `clearAllIds` or 
`clearColAndFieldIDs`?
   
   Because IMO the current name can imply only the nested field ID is cleared



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java:
##########
@@ -243,12 +227,103 @@ default GenerationExpression 
columnGenerationExpression() {
    * others.
    * <p>
    * This API covers top-level columns only. Nested struct fields, array 
elements, and map
-   * keys/values do not have separate IDs. Connectors that track nested field 
IDs can encode
-   * them into the returned top-level Column ID string to detect nested 
changes, since Spark
-   * only compares string equality.
+   * keys/values carry their own IDs in struct field metadata. Spark validates 
both top-level and
+   * nested field IDs as part of schema compatibility checks. See {@link 
StructField#id()}.
    */
   @Nullable
   default String id() {
     return null;
   }
+
+  /**
+   * A builder for {@link Column}.
+   *
+   * @since 4.2.0
+   */
+  class Builder {
+    private final String name;
+    private DataType dataType;
+    private boolean nullable = true;
+    private String comment = null;
+    private ColumnDefaultValue defaultValue = null;
+    private GenerationExpression genExpr = null;
+    private IdentityColumnSpec identityColumnSpec = null;
+    private String metadataInJSON = null;
+    private String id = null;
+
+    private Builder(String name, DataType dataType) {
+      this.name = Objects.requireNonNull(name, "name must not be null");
+      this.dataType = Objects.requireNonNull(dataType, "dataType must not be 
null");
+    }
+
+    public Builder nullable(boolean nullable) {
+      this.nullable = nullable;
+      return this;
+    }
+
+    public Builder comment(String comment) {
+      this.comment = comment;
+      return this;
+    }
+
+    public Builder defaultValue(ColumnDefaultValue defaultValue) {
+      this.defaultValue = defaultValue;
+      return this;
+    }
+
+    public Builder generationExpression(String sql) {
+      this.genExpr = sql != null ? new GenerationExpression(sql) : null;
+      return this;
+    }
+
+    public Builder generationExpression(GenerationExpression generationExpr) {
+      this.genExpr = generationExpr;
+      return this;
+    }
+
+    public Builder identityColumnSpec(IdentityColumnSpec identityColumnSpec) {
+      this.identityColumnSpec = identityColumnSpec;
+      return this;
+    }
+
+    public Builder metadata(String metadataInJSON) {
+      this.metadataInJSON = metadataInJSON;
+      return this;
+    }
+
+    public Builder id(String id) {
+      this.id = id;
+      return this;
+    }
+
+    public Builder clearFieldIds() {
+      this.id = null;
+      this.dataType = SchemaUtils.clearFieldIds(dataType);
+      return this;
+    }
+
+    public Column build() {
+      validateState();
+      return new ColumnImpl(
+          name, dataType, nullable, comment, defaultValue,
+          genExpr, identityColumnSpec, metadataInJSON, id);
+    }
+
+    private void validateState() {
+      if (hasConflictingDefinitions()) {
+        throw new SparkIllegalArgumentException(
+            "INTERNAL_ERROR",
+            Map.of("message",
+                "Column '" + name + "' cannot have more than one definition 
of: " +
+                "default value, generation expression, identity column spec"));

Review Comment:
   I assume in reality, this error shouldn't happen, so we cannot test it right?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala:
##########
@@ -699,15 +721,21 @@ private[sql] object CatalogV2Util {
 
   /**
    * Converts a StructType to DS v2 columns, which decodes the StructField 
metadata to v2 column
-   * comment and default value or generation expression. This is mainly used 
to generate DS v2
-   * columns from table schema in DDL commands, so that Spark can pass DS v2 
columns to DS v2
-   * createTable and related APIs.
+   * comment, default value or generation expression, and column ID. This is 
mainly used to
+   * generate DS v2 columns from table schema in DDL commands, so that Spark 
can pass DS v2
+   * columns to DS v2 createTable and related APIs.
    */
-  def structTypeToV2Columns(schema: StructType): Array[Column] = {
-    schema.fields.map(structFieldToV2Column)
+  def structTypeToV2Columns(
+      schema: StructType,
+      keepFieldIds: Boolean = true): Array[Column] = {
+    schema.fields.map(structFieldToV2Column(_, keepFieldIds))
+  }
+
+  def clearFieldIds(columns: Array[Column]): Array[Column] = {

Review Comment:
   Ditto, I wonder if we should have a different name if we are also clearing 
top-level ID



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala:
##########
@@ -721,6 +749,10 @@ private[sql] object CatalogV2Util {
       }.build()
     }
 
+    val id = if (keepFieldIds) f.id.orNull else null

Review Comment:
   ```suggestion
       val topLevelColId = if (keepFieldIds) f.id.orNull else null
   ```
   
   Nit: Currently it's a bit confusing to reason whether sth is meant for 
top-level col ID, or nested struct ID, or both, I think having a clear name 
would help



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2IncrementallyConstructedQueryTests.scala:
##########
@@ -449,10 +449,10 @@ trait DSv2IncrementallyConstructedQueryTests extends 
DSv2ExternalMutationTestBas
 
   // 
---------------------------------------------------------------------------
   // Scenario 6: external type change (drop INT column, add STRING column).
-  // The delete removes the old column ID and the add assigns a fresh one,
-  // so the column ID check fires (COLUMN_ID_MISMATCH) in classic before schema
-  // validation gets a chance to compare data types.
-  // Connect re-resolves both sides with the new column ID.
+  // The delete removes the old column and the add assigns a fresh one,
+  // so COLUMNS_MISMATCH fires in classic before schema validation gets a

Review Comment:
   schema validation is part of the `COLUMNS_MISMATCH` check, so I think it can 
be confusing to say `COLUMNS_MISMATCH` fires before schema validation, I think 
we should instead say "so column IDs check fires"



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -2552,15 +2478,14 @@ class DataSourceV2DataFrameSuite
       assert(typeAfter.toString.startsWith("StructType(StructField(age"),
         s"age should be first field after reorder, got: $typeAfter")
 
-      // Position-based keys: each ordinal position keeps its old ID after
-      // reorder, so the composed string is unchanged despite the schema 
change.
+      // The top-level column ID is preserved after reorder.
       assert(idBefore == idAfter,
-        s"Composed ID should be unchanged after reorder: $idBefore vs 
$idAfter")
+        s"Top-level column ID should be unchanged after reorder: $idBefore vs 
$idAfter")
     }
   }
 
-  test("composed nested IDs tolerate nested field reorder end-to-end") {
-    val t = "composedidcat.ns1.ns2.tbl"
+  test("nested field reorder does not trigger column ID mismatch") {

Review Comment:
   > // InMemoryTable does not actually reorder nested struct fields in stored
         // data, so the read still returns the original field order. This is 
fine
         // because the purpose of this test is to verify that the column ID 
check
   
   Based on the comment, `InMemoryTable` does not actually do the re-ordering, 
it's a limitation
   
   => Let's try to fix `InMemoryTable` so that it actual does the re-ordering?
   
   > val nameParts = colPath :+ field.name
                 if (checkFieldIds) {
                   for (id <- field.id; otherId <- otherField.id if id != 
otherId) {
                     errors += s"${nameParts.fullyQuoted} field ID has changed 
from $id to $otherId"
                   }
                 }
   
   Cuz the schema validation validates nested columns based on name, not 
position in the struct
   
   => I'm fine if this name matching mechanism is already tested in some way.



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -3009,12 +3009,6 @@
           "<errors>"
         ]
       },
-      "COLUMN_ID_MISMATCH" : {

Review Comment:
   Like we have ComposedColumnIdTableCatalog, 
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TypeChangeResetsColIdTableCatalog.scala,....
 where we mock various connector scenarios, e.g. Connector that provide 
top-level column ID only, no struct column ID; Connector that provides both 
top-level col ID and struct col ID,....
   



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala:
##########
@@ -200,6 +200,20 @@ class ShowCreateTableSuite extends 
command.ShowCreateTableSuiteBase with Command
     }
   }
 
+  test("SPARK-57544: show create table does not expose column IDs; schema 
does") {
+    withNamespaceAndTable(ns, table) { t =>
+      sql(s"CREATE TABLE $t (id INT, salary INT) $defaultUsing")
+
+      // Column IDs assigned by the catalog must NOT appear in SHOW CREATE 
TABLE output.
+      val showDDL = getShowCreateDDL(t)
+      assert(!showDDL.exists(_.contains("__FIELD_ID")), s"command must not 
expose column IDs")

Review Comment:
   ```suggestion
         assert(!showDDL.exists(_.contains(FIELD_ID_METADATA_KEY)), s"command 
must not expose column IDs")
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala:
##########
@@ -699,15 +721,21 @@ private[sql] object CatalogV2Util {
 
   /**
    * Converts a StructType to DS v2 columns, which decodes the StructField 
metadata to v2 column
-   * comment and default value or generation expression. This is mainly used 
to generate DS v2
-   * columns from table schema in DDL commands, so that Spark can pass DS v2 
columns to DS v2
-   * createTable and related APIs.
+   * comment, default value or generation expression, and column ID. This is 
mainly used to
+   * generate DS v2 columns from table schema in DDL commands, so that Spark 
can pass DS v2
+   * columns to DS v2 createTable and related APIs.
    */
-  def structTypeToV2Columns(schema: StructType): Array[Column] = {
-    schema.fields.map(structFieldToV2Column)
+  def structTypeToV2Columns(
+      schema: StructType,
+      keepFieldIds: Boolean = true): Array[Column] = {

Review Comment:
   ```suggestion
         keepFieldIds: Boolean): Array[Column] = {
   ```
   
   Let's remove the default so it forces call-sites to carefully adopt? 



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala:
##########
@@ -615,6 +653,44 @@ class DataSourceV2SQLSuiteV1Filter
     }
   }
 
+  test("ReplaceTableAsSelect: field IDs in query schema are not propagated to 
table columns") {
+    val basicCatalog = catalog("testcat").asTableCatalog
+    val atomicCatalog = catalog("testcat_atomic").asTableCatalog
+    val basicIdentifier = "testcat.table_name"

Review Comment:
   Do we need to have withTable to clean up the `identifier`?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala:
##########
@@ -736,33 +768,45 @@ private[sql] object CatalogV2Util {
         assert(e.resolved && e.foldable,
           "The existence default value must be a simple SQL string that is 
resolved and " +
             "foldable, but got: " + f.getExistenceDefaultValue().get)
-        LiteralValue(e.eval(), f.dataType)
+        LiteralValue(e.eval(), dataType)
       } else {
         null
       }
       val defaultValue = new 
ColumnDefaultValue(f.getCurrentDefaultValue().get, existsDefault)
-      val cleanedMetadata = metadataWithKeysRemoved(
-        Seq("comment", CURRENT_DEFAULT_COLUMN_METADATA_KEY, 
EXISTS_DEFAULT_COLUMN_METADATA_KEY))
-      Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull, 
defaultValue,
-        metadataAsJson(cleanedMetadata))
+      val cleanedMetadata = 
metadataWithKeysRemoved(REMOVED_DEFAULT_VALUE_METADATA_KEYS)
+      Column.builderFor(f.name, dataType)
+        .nullable(f.nullable)
+        .comment(comment)
+        .defaultValue(defaultValue)
+        .metadata(metadataAsJson(cleanedMetadata))
+        .id(id)
+        .build()
     } else if (isGeneratedColumn) {
-      val cleanedMetadata = metadataWithKeysRemoved(
-        Seq("comment", GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY))
-      Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull,
-        new 
GenerationExpression(GeneratedColumn.getGenerationExpression(f).get),
-        metadataAsJson(cleanedMetadata))
+      val cleanedMetadata = 
metadataWithKeysRemoved(REMOVED_GENERATED_COL_METADATA_KEYS)

Review Comment:
   I feel like the abstraction `REMOVED_GENERATED_COL_METADATA_KEYS` only means 
generated column ID, should we perhaps rewrite this to `FIELD_ID_METADATA_KEY` 
++ `REMOVED_GENERATED_COL_METADATA_KEYS` for clarity?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala:
##########
@@ -721,6 +749,10 @@ private[sql] object CatalogV2Util {
       }.build()
     }
 
+    val id = if (keepFieldIds) f.id.orNull else null

Review Comment:
   Does `keepFieldIds` also mean keeping top-level col ID?



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -77,10 +79,7 @@ class DataSourceV2DataFrameSuite
     .set("spark.sql.catalog.mixedcolidcat",
       classOf[MixedColumnIdTableCatalog].getName)
     .set("spark.sql.catalog.mixedcolidcat.copyOnLoad", "true")
-    .set("spark.sql.catalog.composedidcat",
-      classOf[ComposedColumnIdTableCatalog].getName)
-    .set("spark.sql.catalog.composedidcat.copyOnLoad", "true")
-    .set(SQLConf.TIME_TYPE_ENABLED.key, "true")
+    .set(InMemoryBaseTable.ASSIGN_COLUMN_IDS, "true")

Review Comment:
   I assume we need this flag `ASSIGN_COLUMN_IDS` because there are scenarios 
where we want to test table ID or schema name/data type validation?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2TableReference.scala:
##########


Review Comment:
   @aokolnychyi In that case, IMO it's clearer if we avoid setting a default 
value for `checkFieldIds`, so we force all current & future call-sites to be 
clear with the semantics



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to