uros-b commented on code in PR #56684:
URL: https://github.com/apache/spark/pull/56684#discussion_r3462708588


##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala:
##########
@@ -351,6 +349,70 @@ object DatasetManager extends Logging {
     )
   }
 
+  /** Loads the table at `identifier` from `catalog`, or `None` if it does not 
exist. */
+  private def loadTableIfExists(
+      catalog: TableCatalog,
+      identifier: Identifier): Option[CatalogTable] = {
+    Option.when(catalog.tableExists(identifier))(catalog.loadTable(identifier))
+  }
+
+  /**
+   * Creates the table at `identifier` with the given schema, properties, and 
partition/cluster
+   * transforms. Used when no table yet exists at the identifier.
+   *
+   * @param schema     the schema to create the table with.
+   * @param properties the table properties to create the table with.
+   * @param transforms the partition/cluster transforms to create the table 
with.
+   */
+  private def createTableInCatalog(
+      catalog: TableCatalog,
+      tableIdentifier: Identifier,
+      schema: StructType,
+      properties: Map[String, String],
+      transforms: Seq[Transform]): Unit = {
+    catalog.createTable(
+      tableIdentifier,
+      new TableInfo.Builder()
+        .withProperties(properties.asJava)
+        .withColumns(CatalogV2Util.structTypeToV2Columns(schema))
+        .withPartitions(transforms.toArray)
+        .build()
+    )
+  }
+
+  /**
+   * Evolves the already-existing `existingTable` at `identifier` in place by 
diffing its schema and
+   * (re)setting its properties. Partitioning/clustering cannot change in 
place, so no transforms are
+   * accepted here.
+   *
+   * @param existingTable           the currently materialized table.
+   * @param desiredSchema           the schema the table should have as 
computed in the current
+   *                                execution (the user-specified or inferred 
schema). This is the
+   *                                "incoming" side and may differ from 
`existingTable`'s recorded
+   *                                schema due to schema evolution across runs.
+   * @param properties              the table properties to (re)set on evolve.
+   * @param mergeWithExistingSchema whether the effective schema is the merge 
of the existing and
+   *                                desired schemas (additive evolution) 
rather than the desired
+   *                                schema as-is.
+   */
+  private def evolveTableInCatalog(
+      catalog: TableCatalog,
+      tableIdentifier: Identifier,
+      existingTable: CatalogTable,
+      desiredSchema: StructType,
+      properties: Map[String, String],
+      mergeWithExistingSchema: Boolean): Unit = {
+    val currentSchema = v2ColumnsToStructType(existingTable.columns())
+    val targetSchema = if (mergeWithExistingSchema) {
+      SchemaMergingUtils.mergeSchemas(currentSchema, desiredSchema)
+    } else {
+      desiredSchema
+    }
+    val columnChanges = diffSchemas(currentSchema, targetSchema)
+    val setProperties = properties.map { case (k, v) => 
TableChange.setProperty(k, v) }
+    catalog.alterTable(tableIdentifier, (columnChanges ++ 
setProperties).toArray: _*)

Review Comment:
   Does `evolveTableInCatalog` call `alterTable` unconditionally, even when 
`columnChanges ++ setProperties` is empty? Is that effectively a no-op call?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to