szehon-ho commented on code in PR #56684:
URL: https://github.com/apache/spark/pull/56684#discussion_r3463712085
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala:
##########
@@ -351,6 +349,80 @@ object DatasetManager extends Logging {
)
}
+ /** Loads the table at `identifier` from `catalog`, or `None` if it does not
exist. */
+ private def loadTableIfExists(
+ catalog: TableCatalog,
+ identifier: Identifier): Option[V2Table] = {
+ Option.when(catalog.tableExists(identifier))(catalog.loadTable(identifier))
+ }
+
+ /**
+ * Creates the table at `identifier` with the given schema, properties, and
partition/cluster
+ * transforms. Used when no table yet exists at the identifier.
+ *
+ * @param schema the schema to create the table with.
+ * @param properties the table properties to create the table with.
+ * @param transforms the partition/cluster transforms to create the table
with.
+ */
+ private def createTable(
+ catalog: TableCatalog,
+ tableIdentifier: Identifier,
+ schema: StructType,
+ properties: Map[String, String],
+ transforms: Seq[Transform]): Unit = {
+ catalog.createTable(
+ tableIdentifier,
+ new TableInfo.Builder()
+ .withProperties(properties.asJava)
+ .withColumns(CatalogV2Util.structTypeToV2Columns(schema))
+ .withPartitions(transforms.toArray)
+ .build()
+ )
+ }
+
+ /**
+ * Evolves the already-existing `existingTable` at `identifier` in place by
diffing its schema
+ * and properties, skipping the catalog `alterTable` entirely when nothing
actually changes.
+ * Partitioning/clustering cannot change in place, so no transforms are
accepted here.
+ *
+ * @param existingTable the currently materialized table.
+ * @param desiredSchema the schema the table should have as
computed in the current
+ * execution (the user-specified or inferred
schema). This is the
+ * "incoming" side and may differ from
`existingTable`'s recorded
+ * schema due to schema evolution across runs.
+ * @param properties the table properties to (re)set on evolve.
+ * @param mergeWithExistingSchema whether the effective schema is the merge
of the existing and
+ * desired schemas (additive evolution)
rather than the desired
+ * schema as-is.
+ */
+ private def evolveTable(
+ catalog: TableCatalog,
+ tableIdentifier: Identifier,
+ existingTable: V2Table,
+ desiredSchema: StructType,
+ properties: Map[String, String],
+ mergeWithExistingSchema: Boolean): Unit = {
+ val currentSchema = v2ColumnsToStructType(existingTable.columns())
+ val targetSchema = if (mergeWithExistingSchema) {
+ SchemaMergingUtils.mergeSchemas(currentSchema, desiredSchema)
+ } else {
+ desiredSchema
+ }
+ val columnChanges = diffSchemas(currentSchema, targetSchema)
+
+ val existingProperties = existingTable.properties()
+ val propertyChanges = properties.collect {
Review Comment:
had a question, do we handle property removal? its not changed by this pr,
but just curious
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]