AnishMahto commented on code in PR #56684:
URL: https://github.com/apache/spark/pull/56684#discussion_r3465641543


##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala:
##########
@@ -351,6 +349,80 @@ object DatasetManager extends Logging {
     )
   }
 
+  /** Loads the table at `identifier` from `catalog`, or `None` if it does not 
exist. */
+  private def loadTableIfExists(
+      catalog: TableCatalog,
+      identifier: Identifier): Option[V2Table] = {
+    Option.when(catalog.tableExists(identifier))(catalog.loadTable(identifier))
+  }
+
+  /**
+   * Creates the table at `identifier` with the given schema, properties, and 
partition/cluster
+   * transforms. Used when no table yet exists at the identifier.
+   *
+   * @param schema     the schema to create the table with.
+   * @param properties the table properties to create the table with.
+   * @param transforms the partition/cluster transforms to create the table 
with.
+   */
+  private def createTable(
+      catalog: TableCatalog,
+      tableIdentifier: Identifier,
+      schema: StructType,
+      properties: Map[String, String],
+      transforms: Seq[Transform]): Unit = {
+    catalog.createTable(
+      tableIdentifier,
+      new TableInfo.Builder()
+        .withProperties(properties.asJava)
+        .withColumns(CatalogV2Util.structTypeToV2Columns(schema))
+        .withPartitions(transforms.toArray)
+        .build()
+    )
+  }
+
+  /**
+   * Evolves the already-existing `existingTable` at `identifier` in place by 
diffing its schema
+   * and properties, skipping the catalog `alterTable` entirely when nothing 
actually changes.
+   * Partitioning/clustering cannot change in place, so no transforms are 
accepted here.
+   *
+   * @param existingTable           the currently materialized table.
+   * @param desiredSchema           the schema the table should have as 
computed in the current
+   *                                execution (the user-specified or inferred 
schema). This is the
+   *                                "incoming" side and may differ from 
`existingTable`'s recorded
+   *                                schema due to schema evolution across runs.
+   * @param properties              the table properties to (re)set on evolve.
+   * @param mergeWithExistingSchema whether the effective schema is the merge 
of the existing and
+   *                                desired schemas (additive evolution) 
rather than the desired
+   *                                schema as-is.
+   */
+  private def evolveTable(
+      catalog: TableCatalog,
+      tableIdentifier: Identifier,
+      existingTable: V2Table,
+      desiredSchema: StructType,
+      properties: Map[String, String],
+      mergeWithExistingSchema: Boolean): Unit = {
+    val currentSchema = v2ColumnsToStructType(existingTable.columns())
+    val targetSchema = if (mergeWithExistingSchema) {
+      SchemaMergingUtils.mergeSchemas(currentSchema, desiredSchema)
+    } else {
+      desiredSchema
+    }
+    val columnChanges = diffSchemas(currentSchema, targetSchema)
+
+    val existingProperties = existingTable.properties()
+    val propertyChanges = properties.collect {

Review Comment:
   This is actually a really good question, had to spend some time thinking 
about it:
   1. No, AFAICT we do not actually handle property removal whatsoever. If a 
user drops properties specified in the `dp.table` decorator, it will not 
actually drop the property from the catalog table. The way that its currently 
implemented, properties can only be added and not removed
   2. Agree this is not changed by this PR, but this is definitely a bug
   3. Fixing this is non-trivial. Originally I wanted to simply diff the 
existing table properties and the declared table properties for the current 
pipeline run. But I realized there's a number of table properties that are 
automatically set by the catalog and various API, even if the user has not 
explicitly declared them using the `dp.table` decorator (ex. `clusteringColumns 
`). This means that even if we identify table properties set on the existing 
catalog table entry but not declared in the decorator for the current pipeline 
run, we can't actually differentiate between whether that property was 
previously explicitly set by the user OR if the catalog set it internally.
   
   Fixing this bug is difficult, I think a true fix might require persisting 
the previously declared table properties somewhere. What I'm going to do for 
this PR is create a Jira issue and link it here as a TODO, but I'm not going to 
attempt to make that fix right now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to