longvu-db commented on code in PR #56619:
URL: https://github.com/apache/spark/pull/56619#discussion_r3474657759
##########
sql/api/src/main/scala/org/apache/spark/sql/types/StructField.scala:
##########
@@ -243,6 +244,43 @@ case class StructField(
metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY)
}
+ /**
+ * Updates the field with an ID for column identity tracking.
+ */
+ def withId(id: String): StructField = {
+ val newMetadata = new MetadataBuilder()
+ .withMetadata(metadata)
+ .putString(FIELD_ID_METADATA_KEY, id)
+ .build()
+ copy(metadata = newMetadata)
+ }
+
+ /**
+ * Returns the ID of this field, if set.
+ */
+ def id: Option[String] = {
+ if (metadata.contains(FIELD_ID_METADATA_KEY)) {
+ Some(metadata.getString(FIELD_ID_METADATA_KEY))
+ } else {
+ None
+ }
+ }
+
+ /**
+ * Returns a copy of this field with the field ID removed, or this field if
no ID is set.
+ */
+ def clearId(): StructField = {
Review Comment:
1. Something tells me we should either return a copy in both scenarios, or
in-place modification in both scenarios, for consistency.
2. P/s: if someone calls `field.clearId()`, it feels more intuitive as
in-place modification for me, but I'm okay with this reutrning a copy
3. Also if someone does
a. `fieldCopy = field.clearId() (where no ID is set)
b. `modification_to_the_fieldCopy`
Since currently, we return the field itself, would
`modification_to_the_fieldCopy` modifies both the `fieldCopy` and `field`, sort
of like indirect modification?
##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java:
##########
@@ -243,12 +227,103 @@ default GenerationExpression
columnGenerationExpression() {
* others.
* <p>
* This API covers top-level columns only. Nested struct fields, array
elements, and map
- * keys/values do not have separate IDs. Connectors that track nested field
IDs can encode
- * them into the returned top-level Column ID string to detect nested
changes, since Spark
- * only compares string equality.
+ * keys/values carry their own IDs in struct field metadata. Spark validates
both top-level and
+ * nested field IDs as part of schema compatibility checks. See {@link
StructField#id()}.
Review Comment:
```suggestion
* nested struct field IDs as part of schema compatibility checks (array
elements and map/key values' validation is not supported yet). See {@link
StructField#id()}.
```
##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java:
##########
@@ -243,12 +227,103 @@ default GenerationExpression
columnGenerationExpression() {
* others.
* <p>
* This API covers top-level columns only. Nested struct fields, array
elements, and map
- * keys/values do not have separate IDs. Connectors that track nested field
IDs can encode
- * them into the returned top-level Column ID string to detect nested
changes, since Spark
- * only compares string equality.
+ * keys/values carry their own IDs in struct field metadata. Spark validates
both top-level and
+ * nested field IDs as part of schema compatibility checks. See {@link
StructField#id()}.
*/
@Nullable
default String id() {
return null;
}
+
+ /**
+ * A builder for {@link Column}.
+ *
+ * @since 4.2.0
+ */
+ class Builder {
+ private final String name;
+ private DataType dataType;
+ private boolean nullable = true;
+ private String comment = null;
+ private ColumnDefaultValue defaultValue = null;
+ private GenerationExpression genExpr = null;
+ private IdentityColumnSpec identityColumnSpec = null;
+ private String metadataInJSON = null;
+ private String id = null;
+
+ private Builder(String name, DataType dataType) {
+ this.name = Objects.requireNonNull(name, "name must not be null");
+ this.dataType = Objects.requireNonNull(dataType, "dataType must not be
null");
+ }
+
+ public Builder nullable(boolean nullable) {
+ this.nullable = nullable;
+ return this;
+ }
+
+ public Builder comment(String comment) {
+ this.comment = comment;
+ return this;
+ }
+
+ public Builder defaultValue(ColumnDefaultValue defaultValue) {
+ this.defaultValue = defaultValue;
+ return this;
+ }
+
+ public Builder generationExpression(String sql) {
+ this.genExpr = sql != null ? new GenerationExpression(sql) : null;
+ return this;
+ }
+
+ public Builder generationExpression(GenerationExpression generationExpr) {
+ this.genExpr = generationExpr;
+ return this;
+ }
+
+ public Builder identityColumnSpec(IdentityColumnSpec identityColumnSpec) {
+ this.identityColumnSpec = identityColumnSpec;
+ return this;
+ }
+
+ public Builder metadata(String metadataInJSON) {
+ this.metadataInJSON = metadataInJSON;
+ return this;
+ }
+
+ public Builder id(String id) {
+ this.id = id;
+ return this;
+ }
+
+ public Builder clearFieldIds() {
+ this.id = null;
Review Comment:
Is it a problem if the `clearId` above returns a copy, but `clearFieldIds`
here doesn't?
##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java:
##########
@@ -243,12 +227,103 @@ default GenerationExpression
columnGenerationExpression() {
* others.
* <p>
* This API covers top-level columns only. Nested struct fields, array
elements, and map
- * keys/values do not have separate IDs. Connectors that track nested field
IDs can encode
- * them into the returned top-level Column ID string to detect nested
changes, since Spark
- * only compares string equality.
+ * keys/values carry their own IDs in struct field metadata. Spark validates
both top-level and
+ * nested field IDs as part of schema compatibility checks. See {@link
StructField#id()}.
*/
@Nullable
default String id() {
return null;
}
+
+ /**
+ * A builder for {@link Column}.
+ *
+ * @since 4.2.0
+ */
+ class Builder {
+ private final String name;
+ private DataType dataType;
+ private boolean nullable = true;
+ private String comment = null;
+ private ColumnDefaultValue defaultValue = null;
+ private GenerationExpression genExpr = null;
+ private IdentityColumnSpec identityColumnSpec = null;
+ private String metadataInJSON = null;
+ private String id = null;
+
+ private Builder(String name, DataType dataType) {
+ this.name = Objects.requireNonNull(name, "name must not be null");
+ this.dataType = Objects.requireNonNull(dataType, "dataType must not be
null");
+ }
+
+ public Builder nullable(boolean nullable) {
+ this.nullable = nullable;
+ return this;
+ }
+
+ public Builder comment(String comment) {
+ this.comment = comment;
+ return this;
+ }
+
+ public Builder defaultValue(ColumnDefaultValue defaultValue) {
+ this.defaultValue = defaultValue;
+ return this;
+ }
+
+ public Builder generationExpression(String sql) {
+ this.genExpr = sql != null ? new GenerationExpression(sql) : null;
+ return this;
+ }
+
+ public Builder generationExpression(GenerationExpression generationExpr) {
+ this.genExpr = generationExpr;
+ return this;
+ }
+
+ public Builder identityColumnSpec(IdentityColumnSpec identityColumnSpec) {
+ this.identityColumnSpec = identityColumnSpec;
+ return this;
+ }
+
+ public Builder metadata(String metadataInJSON) {
+ this.metadataInJSON = metadataInJSON;
+ return this;
+ }
+
+ public Builder id(String id) {
+ this.id = id;
+ return this;
+ }
+
+ public Builder clearFieldIds() {
Review Comment:
`clearFieldIds` -> Should we rename it to `clearAllIds` or
`clearColAndFieldIDs`?
Because IMO the current name can imply only the nested field ID is cleared
##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java:
##########
@@ -243,12 +227,103 @@ default GenerationExpression
columnGenerationExpression() {
* others.
* <p>
* This API covers top-level columns only. Nested struct fields, array
elements, and map
- * keys/values do not have separate IDs. Connectors that track nested field
IDs can encode
- * them into the returned top-level Column ID string to detect nested
changes, since Spark
- * only compares string equality.
+ * keys/values carry their own IDs in struct field metadata. Spark validates
both top-level and
+ * nested field IDs as part of schema compatibility checks. See {@link
StructField#id()}.
*/
@Nullable
default String id() {
return null;
}
+
+ /**
+ * A builder for {@link Column}.
+ *
+ * @since 4.2.0
+ */
+ class Builder {
+ private final String name;
+ private DataType dataType;
+ private boolean nullable = true;
+ private String comment = null;
+ private ColumnDefaultValue defaultValue = null;
+ private GenerationExpression genExpr = null;
+ private IdentityColumnSpec identityColumnSpec = null;
+ private String metadataInJSON = null;
+ private String id = null;
+
+ private Builder(String name, DataType dataType) {
+ this.name = Objects.requireNonNull(name, "name must not be null");
+ this.dataType = Objects.requireNonNull(dataType, "dataType must not be
null");
+ }
+
+ public Builder nullable(boolean nullable) {
+ this.nullable = nullable;
+ return this;
+ }
+
+ public Builder comment(String comment) {
+ this.comment = comment;
+ return this;
+ }
+
+ public Builder defaultValue(ColumnDefaultValue defaultValue) {
+ this.defaultValue = defaultValue;
+ return this;
+ }
+
+ public Builder generationExpression(String sql) {
+ this.genExpr = sql != null ? new GenerationExpression(sql) : null;
+ return this;
+ }
+
+ public Builder generationExpression(GenerationExpression generationExpr) {
+ this.genExpr = generationExpr;
+ return this;
+ }
+
+ public Builder identityColumnSpec(IdentityColumnSpec identityColumnSpec) {
+ this.identityColumnSpec = identityColumnSpec;
+ return this;
+ }
+
+ public Builder metadata(String metadataInJSON) {
+ this.metadataInJSON = metadataInJSON;
+ return this;
+ }
+
+ public Builder id(String id) {
+ this.id = id;
+ return this;
+ }
+
+ public Builder clearFieldIds() {
+ this.id = null;
+ this.dataType = SchemaUtils.clearFieldIds(dataType);
+ return this;
+ }
+
+ public Column build() {
+ validateState();
+ return new ColumnImpl(
+ name, dataType, nullable, comment, defaultValue,
+ genExpr, identityColumnSpec, metadataInJSON, id);
+ }
+
+ private void validateState() {
+ if (hasConflictingDefinitions()) {
+ throw new SparkIllegalArgumentException(
+ "INTERNAL_ERROR",
+ Map.of("message",
+ "Column '" + name + "' cannot have more than one definition
of: " +
+ "default value, generation expression, identity column spec"));
Review Comment:
I assume in reality, this error shouldn't happen, so we cannot test it right?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala:
##########
@@ -699,15 +721,21 @@ private[sql] object CatalogV2Util {
/**
* Converts a StructType to DS v2 columns, which decodes the StructField
metadata to v2 column
- * comment and default value or generation expression. This is mainly used
to generate DS v2
- * columns from table schema in DDL commands, so that Spark can pass DS v2
columns to DS v2
- * createTable and related APIs.
+ * comment, default value or generation expression, and column ID. This is
mainly used to
+ * generate DS v2 columns from table schema in DDL commands, so that Spark
can pass DS v2
+ * columns to DS v2 createTable and related APIs.
*/
- def structTypeToV2Columns(schema: StructType): Array[Column] = {
- schema.fields.map(structFieldToV2Column)
+ def structTypeToV2Columns(
+ schema: StructType,
+ keepFieldIds: Boolean = true): Array[Column] = {
+ schema.fields.map(structFieldToV2Column(_, keepFieldIds))
+ }
+
+ def clearFieldIds(columns: Array[Column]): Array[Column] = {
Review Comment:
Ditto, I wonder if we should have a different name if we are also clearing
top-level ID
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala:
##########
@@ -721,6 +749,10 @@ private[sql] object CatalogV2Util {
}.build()
}
+ val id = if (keepFieldIds) f.id.orNull else null
Review Comment:
```suggestion
val topLevelColId = if (keepFieldIds) f.id.orNull else null
```
Nit: Currently it's a bit confusing to reason whether sth is meant for
top-level col ID, or nested struct ID, or both, I think having a clear name
would help
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2IncrementallyConstructedQueryTests.scala:
##########
@@ -449,10 +449,10 @@ trait DSv2IncrementallyConstructedQueryTests extends
DSv2ExternalMutationTestBas
//
---------------------------------------------------------------------------
// Scenario 6: external type change (drop INT column, add STRING column).
- // The delete removes the old column ID and the add assigns a fresh one,
- // so the column ID check fires (COLUMN_ID_MISMATCH) in classic before schema
- // validation gets a chance to compare data types.
- // Connect re-resolves both sides with the new column ID.
+ // The delete removes the old column and the add assigns a fresh one,
+ // so COLUMNS_MISMATCH fires in classic before schema validation gets a
Review Comment:
schema validation is part of the `COLUMNS_MISMATCH` check, so I think it can
be confusing to say `COLUMNS_MISMATCH` fires before schema validation, I think
we should instead say "so column IDs check fires"
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -2552,15 +2478,14 @@ class DataSourceV2DataFrameSuite
assert(typeAfter.toString.startsWith("StructType(StructField(age"),
s"age should be first field after reorder, got: $typeAfter")
- // Position-based keys: each ordinal position keeps its old ID after
- // reorder, so the composed string is unchanged despite the schema
change.
+ // The top-level column ID is preserved after reorder.
assert(idBefore == idAfter,
- s"Composed ID should be unchanged after reorder: $idBefore vs
$idAfter")
+ s"Top-level column ID should be unchanged after reorder: $idBefore vs
$idAfter")
}
}
- test("composed nested IDs tolerate nested field reorder end-to-end") {
- val t = "composedidcat.ns1.ns2.tbl"
+ test("nested field reorder does not trigger column ID mismatch") {
Review Comment:
> // InMemoryTable does not actually reorder nested struct fields in stored
// data, so the read still returns the original field order. This is
fine
// because the purpose of this test is to verify that the column ID
check
Based on the comment, `InMemoryTable` does not actually do the re-ordering,
it's a limitation
=> Let's try to fix `InMemoryTable` so that it actual does the re-ordering?
> val nameParts = colPath :+ field.name
if (checkFieldIds) {
for (id <- field.id; otherId <- otherField.id if id !=
otherId) {
errors += s"${nameParts.fullyQuoted} field ID has changed
from $id to $otherId"
}
}
Cuz the schema validation validates nested columns based on name, not
position in the struct
=> I'm fine if this name matching mechanism is already tested in some way.
##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -3009,12 +3009,6 @@
"<errors>"
]
},
- "COLUMN_ID_MISMATCH" : {
Review Comment:
Like we have ComposedColumnIdTableCatalog,
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TypeChangeResetsColIdTableCatalog.scala,....
where we mock various connector scenarios, e.g. Connector that provide
top-level column ID only, no struct column ID; Connector that provides both
top-level col ID and struct col ID,....
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala:
##########
@@ -200,6 +200,20 @@ class ShowCreateTableSuite extends
command.ShowCreateTableSuiteBase with Command
}
}
+ test("SPARK-57544: show create table does not expose column IDs; schema
does") {
+ withNamespaceAndTable(ns, table) { t =>
+ sql(s"CREATE TABLE $t (id INT, salary INT) $defaultUsing")
+
+ // Column IDs assigned by the catalog must NOT appear in SHOW CREATE
TABLE output.
+ val showDDL = getShowCreateDDL(t)
+ assert(!showDDL.exists(_.contains("__FIELD_ID")), s"command must not
expose column IDs")
Review Comment:
```suggestion
assert(!showDDL.exists(_.contains(FIELD_ID_METADATA_KEY)), s"command
must not expose column IDs")
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala:
##########
@@ -699,15 +721,21 @@ private[sql] object CatalogV2Util {
/**
* Converts a StructType to DS v2 columns, which decodes the StructField
metadata to v2 column
- * comment and default value or generation expression. This is mainly used
to generate DS v2
- * columns from table schema in DDL commands, so that Spark can pass DS v2
columns to DS v2
- * createTable and related APIs.
+ * comment, default value or generation expression, and column ID. This is
mainly used to
+ * generate DS v2 columns from table schema in DDL commands, so that Spark
can pass DS v2
+ * columns to DS v2 createTable and related APIs.
*/
- def structTypeToV2Columns(schema: StructType): Array[Column] = {
- schema.fields.map(structFieldToV2Column)
+ def structTypeToV2Columns(
+ schema: StructType,
+ keepFieldIds: Boolean = true): Array[Column] = {
Review Comment:
```suggestion
keepFieldIds: Boolean): Array[Column] = {
```
Let's remove the default so it forces call-sites to carefully adopt?
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala:
##########
@@ -615,6 +653,44 @@ class DataSourceV2SQLSuiteV1Filter
}
}
+ test("ReplaceTableAsSelect: field IDs in query schema are not propagated to
table columns") {
+ val basicCatalog = catalog("testcat").asTableCatalog
+ val atomicCatalog = catalog("testcat_atomic").asTableCatalog
+ val basicIdentifier = "testcat.table_name"
Review Comment:
Do we need to have withTable to clean up the `identifier`?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala:
##########
@@ -736,33 +768,45 @@ private[sql] object CatalogV2Util {
assert(e.resolved && e.foldable,
"The existence default value must be a simple SQL string that is
resolved and " +
"foldable, but got: " + f.getExistenceDefaultValue().get)
- LiteralValue(e.eval(), f.dataType)
+ LiteralValue(e.eval(), dataType)
} else {
null
}
val defaultValue = new
ColumnDefaultValue(f.getCurrentDefaultValue().get, existsDefault)
- val cleanedMetadata = metadataWithKeysRemoved(
- Seq("comment", CURRENT_DEFAULT_COLUMN_METADATA_KEY,
EXISTS_DEFAULT_COLUMN_METADATA_KEY))
- Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull,
defaultValue,
- metadataAsJson(cleanedMetadata))
+ val cleanedMetadata =
metadataWithKeysRemoved(REMOVED_DEFAULT_VALUE_METADATA_KEYS)
+ Column.builderFor(f.name, dataType)
+ .nullable(f.nullable)
+ .comment(comment)
+ .defaultValue(defaultValue)
+ .metadata(metadataAsJson(cleanedMetadata))
+ .id(id)
+ .build()
} else if (isGeneratedColumn) {
- val cleanedMetadata = metadataWithKeysRemoved(
- Seq("comment", GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY))
- Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull,
- new
GenerationExpression(GeneratedColumn.getGenerationExpression(f).get),
- metadataAsJson(cleanedMetadata))
+ val cleanedMetadata =
metadataWithKeysRemoved(REMOVED_GENERATED_COL_METADATA_KEYS)
Review Comment:
I feel like the abstraction `REMOVED_GENERATED_COL_METADATA_KEYS` only means
generated column ID, should we perhaps rewrite this to `FIELD_ID_METADATA_KEY`
++ `REMOVED_GENERATED_COL_METADATA_KEYS` for clarity?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala:
##########
@@ -721,6 +749,10 @@ private[sql] object CatalogV2Util {
}.build()
}
+ val id = if (keepFieldIds) f.id.orNull else null
Review Comment:
Does `keepFieldIds` also mean keeping top-level col ID?
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -77,10 +79,7 @@ class DataSourceV2DataFrameSuite
.set("spark.sql.catalog.mixedcolidcat",
classOf[MixedColumnIdTableCatalog].getName)
.set("spark.sql.catalog.mixedcolidcat.copyOnLoad", "true")
- .set("spark.sql.catalog.composedidcat",
- classOf[ComposedColumnIdTableCatalog].getName)
- .set("spark.sql.catalog.composedidcat.copyOnLoad", "true")
- .set(SQLConf.TIME_TYPE_ENABLED.key, "true")
+ .set(InMemoryBaseTable.ASSIGN_COLUMN_IDS, "true")
Review Comment:
I assume we need this flag `ASSIGN_COLUMN_IDS` because there are scenarios
where we want to test table ID or schema name/data type validation?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2TableReference.scala:
##########
Review Comment:
@aokolnychyi In that case, IMO it's clearer if we avoid setting a default
value for `checkFieldIds`, so we force all current & future call-sites to be
clear with the semantics
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]