AnishMahto commented on code in PR #56684:
URL: https://github.com/apache/spark/pull/56684#discussion_r3470301673
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala:
##########
@@ -351,6 +349,70 @@ object DatasetManager extends Logging {
)
}
+ /** Loads the table at `identifier` from `catalog`, or `None` if it does not
exist. */
+ private def loadTableIfExists(
+ catalog: TableCatalog,
+ identifier: Identifier): Option[CatalogTable] = {
+ Option.when(catalog.tableExists(identifier))(catalog.loadTable(identifier))
+ }
+
+ /**
+ * Creates the table at `identifier` with the given schema, properties, and
partition/cluster
+ * transforms. Used when no table yet exists at the identifier.
+ *
+ * @param schema the schema to create the table with.
+ * @param properties the table properties to create the table with.
+ * @param transforms the partition/cluster transforms to create the table
with.
+ */
+ private def createTableInCatalog(
+ catalog: TableCatalog,
+ tableIdentifier: Identifier,
+ schema: StructType,
+ properties: Map[String, String],
+ transforms: Seq[Transform]): Unit = {
+ catalog.createTable(
+ tableIdentifier,
+ new TableInfo.Builder()
+ .withProperties(properties.asJava)
+ .withColumns(CatalogV2Util.structTypeToV2Columns(schema))
+ .withPartitions(transforms.toArray)
+ .build()
+ )
+ }
+
+ /**
+ * Evolves the already-existing `existingTable` at `identifier` in place by
diffing its schema and
+ * (re)setting its properties. Partitioning/clustering cannot change in
place, so no transforms are
+ * accepted here.
+ *
+ * @param existingTable the currently materialized table.
+ * @param desiredSchema the schema the table should have as
computed in the current
+ * execution (the user-specified or inferred
schema). This is the
+ * "incoming" side and may differ from
`existingTable`'s recorded
+ * schema due to schema evolution across runs.
+ * @param properties the table properties to (re)set on evolve.
+ * @param mergeWithExistingSchema whether the effective schema is the merge
of the existing and
+ * desired schemas (additive evolution)
rather than the desired
+ * schema as-is.
+ */
+ private def evolveTableInCatalog(
+ catalog: TableCatalog,
+ tableIdentifier: Identifier,
+ existingTable: CatalogTable,
+ desiredSchema: StructType,
+ properties: Map[String, String],
+ mergeWithExistingSchema: Boolean): Unit = {
+ val currentSchema = v2ColumnsToStructType(existingTable.columns())
+ val targetSchema = if (mergeWithExistingSchema) {
+ SchemaMergingUtils.mergeSchemas(currentSchema, desiredSchema)
+ } else {
+ desiredSchema
+ }
+ val columnChanges = diffSchemas(currentSchema, targetSchema)
+ val setProperties = properties.map { case (k, v) =>
TableChange.setProperty(k, v) }
+ catalog.alterTable(tableIdentifier, (columnChanges ++
setProperties).toArray: _*)
Review Comment:
I ended up adding unit tests using a stubbed catalog.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]