szehon-ho commented on code in PR #56684:
URL: https://github.com/apache/spark/pull/56684#discussion_r3463712085


##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala:
##########
@@ -351,6 +349,80 @@ object DatasetManager extends Logging {
     )
   }
 
+  /** Loads the table at `identifier` from `catalog`, or `None` if it does not 
exist. */
+  private def loadTableIfExists(
+      catalog: TableCatalog,
+      identifier: Identifier): Option[V2Table] = {
+    Option.when(catalog.tableExists(identifier))(catalog.loadTable(identifier))
+  }
+
+  /**
+   * Creates the table at `identifier` with the given schema, properties, and 
partition/cluster
+   * transforms. Used when no table yet exists at the identifier.
+   *
+   * @param schema     the schema to create the table with.
+   * @param properties the table properties to create the table with.
+   * @param transforms the partition/cluster transforms to create the table 
with.
+   */
+  private def createTable(
+      catalog: TableCatalog,
+      tableIdentifier: Identifier,
+      schema: StructType,
+      properties: Map[String, String],
+      transforms: Seq[Transform]): Unit = {
+    catalog.createTable(
+      tableIdentifier,
+      new TableInfo.Builder()
+        .withProperties(properties.asJava)
+        .withColumns(CatalogV2Util.structTypeToV2Columns(schema))
+        .withPartitions(transforms.toArray)
+        .build()
+    )
+  }
+
+  /**
+   * Evolves the already-existing `existingTable` at `identifier` in place by 
diffing its schema
+   * and properties, skipping the catalog `alterTable` entirely when nothing 
actually changes.
+   * Partitioning/clustering cannot change in place, so no transforms are 
accepted here.
+   *
+   * @param existingTable           the currently materialized table.
+   * @param desiredSchema           the schema the table should have as 
computed in the current
+   *                                execution (the user-specified or inferred 
schema). This is the
+   *                                "incoming" side and may differ from 
`existingTable`'s recorded
+   *                                schema due to schema evolution across runs.
+   * @param properties              the table properties to (re)set on evolve.
+   * @param mergeWithExistingSchema whether the effective schema is the merge 
of the existing and
+   *                                desired schemas (additive evolution) 
rather than the desired
+   *                                schema as-is.
+   */
+  private def evolveTable(
+      catalog: TableCatalog,
+      tableIdentifier: Identifier,
+      existingTable: V2Table,
+      desiredSchema: StructType,
+      properties: Map[String, String],
+      mergeWithExistingSchema: Boolean): Unit = {
+    val currentSchema = v2ColumnsToStructType(existingTable.columns())
+    val targetSchema = if (mergeWithExistingSchema) {
+      SchemaMergingUtils.mergeSchemas(currentSchema, desiredSchema)
+    } else {
+      desiredSchema
+    }
+    val columnChanges = diffSchemas(currentSchema, targetSchema)
+
+    val existingProperties = existingTable.properties()
+    val propertyChanges = properties.collect {

Review Comment:
   had a question, do we handle property removal?  its not changed by this pr, 
but just curious



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to