szehon-ho commented on code in PR #56684:
URL: https://github.com/apache/spark/pull/56684#discussion_r3507912229
##########
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala:
##########
@@ -1083,4 +1091,156 @@ abstract class MaterializeTablesSuite extends
BaseCoreExecutionTest {
}
assert(ex.cause.isInstanceOf[AnalysisException])
}
+
+ // =============== Table evolution in catalog tests ===============
+
+ private val recordingCatalogName = "recording_cat"
+ private val recordingNamespace = "rec_ns"
+
+ /**
+ * Registers [[RecordingInMemoryTableCatalog]] under `recordingCatalogName`,
resets its shared
+ * alter buffer and any cached catalog state, creates `recordingNamespace`,
runs `body`, then
+ * tears the registration back down.
+ */
+ private def withRecordingCatalog(body: => Unit): Unit = {
+ spark.conf.set(
+ s"spark.sql.catalog.$recordingCatalogName",
+ classOf[RecordingInMemoryTableCatalog].getName
+ )
+ RecordingInMemoryTableCatalog.reset()
+ try {
+ spark.sql(s"CREATE NAMESPACE IF NOT EXISTS
$recordingCatalogName.$recordingNamespace")
+ body
+ } finally {
+ RecordingInMemoryTableCatalog.reset()
+ spark.sessionState.catalogManager.reset()
+
spark.sessionState.conf.unsetConf(s"spark.sql.catalog.$recordingCatalogName")
+ }
+ }
+
+ /**
+ * Materializes a single streaming table under the recording
catalog/namespace with the given
+ * schema and properties.
+ */
+ private def materializeStreamingTable(
+ name: String,
+ schema: StructType,
+ properties: Map[String, String]): Unit = {
+ // All nulls dummy row, compatible with any schema type
+ val row = Row.fromSeq(Seq.fill(schema.length)(null))
+ val df = spark.createDataFrame(spark.sparkContext.parallelize(Seq(row)),
schema)
+ materializeGraph(
+ new TestGraphRegistrationContext(spark) {
+ registerTable(
+ name,
+ query = Option(dfFlowFunc(df)),
+ specifiedSchema = Option(schema),
+ properties = properties,
+ catalog = Option(recordingCatalogName),
+ database = Option(recordingNamespace)
+ )
+ }.resolveToDataflowGraph(),
+ storageRoot = storageRoot
+ )
+ }
+
+ private def loadTableFromRecordingCatalog(name: String): V2Table = {
+ val catalog = spark.sessionState.catalogManager
+ .catalog(recordingCatalogName)
+ .asInstanceOf[TableCatalog]
+ catalog.loadTable(Identifier.of(Array(recordingNamespace), name))
+ }
+
+ test("re-materializing an unchanged table does not issue an alterTable") {
+ withRecordingCatalog {
+ val schema = new StructType().add("id", IntegerType).add("value",
StringType)
+ val props = Map("p.a" -> "1", "p.b" -> "2")
+ // Creating the table issues no alter, and re-materializing the
unchanged table is a no-op,
+ // so no alter is ever recorded.
+ materializeStreamingTable("t", schema, props)
+ assert(RecordingInMemoryTableCatalog.recordedAlters.isEmpty)
+ materializeStreamingTable("t", schema, props)
+ assert(RecordingInMemoryTableCatalog.recordedAlters.isEmpty)
+ }
+ }
+
+ test("re-materializing with changed/new properties issues an alterTable that
sets them") {
+ withRecordingCatalog {
+ val schema = new StructType().add("id", IntegerType).add("value",
StringType)
+ // Creating the table issues no alter; re-materializing with
changed/added properties issues
+ // exactly one alter that sets them.
+ materializeStreamingTable("t", schema, Map("p.a" -> "1"))
+ assert(RecordingInMemoryTableCatalog.recordedAlters.isEmpty)
+ materializeStreamingTable("t", schema, Map("p.a" -> "2", "p.new" -> "n"))
+ assert(RecordingInMemoryTableCatalog.recordedAlters.size == 1)
+
+ val changes = RecordingInMemoryTableCatalog.recordedAlters.flatten
+ assert(changes.forall(_.isInstanceOf[TableChange.SetProperty]))
+ val set = changes.collect {
+ case s: TableChange.SetProperty => s.property() -> s.value()
+ }.toMap
+ assert(set == Map("p.a" -> "2", "p.new" -> "n"))
+
+ val table = loadTableFromRecordingCatalog("t")
+ assert(table.properties().get("p.a") == "2")
+ assert(table.properties().get("p.new") == "n")
+ }
+ }
+
+ test("re-materializing with an added column issues an alterTable") {
+ withRecordingCatalog {
+ // Creating the table issues no alter; re-materializing with an added
column issues exactly
+ // one alter that adds it.
+ materializeStreamingTable("t", new StructType().add("id", IntegerType),
Map("p.a" -> "1"))
+ assert(RecordingInMemoryTableCatalog.recordedAlters.isEmpty)
+ materializeStreamingTable(
+ "t",
+ new StructType().add("id", IntegerType).add("value", StringType),
+ Map("p.a" -> "1")
+ )
+ assert(RecordingInMemoryTableCatalog.recordedAlters.size == 1)
+
+ val changes = RecordingInMemoryTableCatalog.recordedAlters.flatten
+ assert(changes.exists(_.isInstanceOf[TableChange.AddColumn]))
+
+ assert(
+ loadTableFromRecordingCatalog("t").columns() sameElements
+ CatalogV2Util.structTypeToV2Columns(
+ new StructType().add("id", IntegerType).add("value", StringType)
+ )
+ )
+ }
+ }
+
+ test("re-materializing with a dropped property neither removes it nor issues
an alterTable") {
+ withRecordingCatalog {
+ val schema = new StructType().add("id", IntegerType)
+ // This test locks in the current buggy behavior where dropped
properties do not materialize
+ // against the catalog table entity. See SPARK-57670.
+ materializeStreamingTable("t", schema, Map("p.keep" -> "v", "p.stale" ->
"old"))
+ assert(RecordingInMemoryTableCatalog.recordedAlters.isEmpty)
+ materializeStreamingTable("t", schema, Map("p.keep" -> "v"))
+ assert(RecordingInMemoryTableCatalog.recordedAlters.isEmpty)
+
+ assert(loadTableFromRecordingCatalog("t").properties().get("p.stale") ==
"old")
+ }
+ }
+}
+
+/**
+ * An [[InMemoryTableCatalog]] that records every `alterTable` invocation
(into the shared companion
+ * buffer) while still applying it, so tests can assert whether
materialization issued an alter or
+ * skipped it as a no-op.
+ */
+class RecordingInMemoryTableCatalog extends InMemoryTableCatalog {
+ override def alterTable(ident: Identifier, changes: TableChange*): V2Table =
{
+ RecordingInMemoryTableCatalog.recordedAlters += changes.toSeq
+ super.alterTable(ident, changes: _*)
+ }
+}
+
+object RecordingInMemoryTableCatalog {
+ val recordedAlters: mutable.ArrayBuffer[Seq[TableChange]] =
+ mutable.ArrayBuffer.empty
+ def reset(): Unit = recordedAlters.clear()
}
Review Comment:
`recordedAlters` is global companion-object state, which only stays safe as
long as nothing that uses this class ever runs concurrently in the same JVM.
Since `CatalogManager` caches plugins by name (`catalogs.getOrElseUpdate(name,
Catalogs.load(name, conf))`), every access within a `withRecordingCatalog`
block resolves to the *same* instance, so the buffer can be per-instance and
read back off the cached catalog. That removes the global state (and the need
for `reset()`) entirely:
```suggestion
/**
* An [[InMemoryTableCatalog]] that records every `alterTable` invocation
(into a per-instance
* buffer) while still applying it, so tests can assert whether
materialization issued an alter or
* skipped it as a no-op. `CatalogManager` caches catalog plugins by name,
so the same instance
* backs every access within a `withRecordingCatalog` block and reads go
through that cached instance.
*/
class RecordingInMemoryTableCatalog extends InMemoryTableCatalog {
val recordedAlters: mutable.ArrayBuffer[Seq[TableChange]] =
mutable.ArrayBuffer.empty
override def alterTable(ident: Identifier, changes: TableChange*): V2Table
= {
recordedAlters += changes.toSeq
super.alterTable(ident, changes: _*)
}
}
```
This needs a few matching changes on the test side:
- Add an accessor next to `loadTableFromRecordingCatalog`:
```scala
private def recordingCatalog: RecordingInMemoryTableCatalog =
spark.sessionState.catalogManager
.catalog(recordingCatalogName)
.asInstanceOf[RecordingInMemoryTableCatalog]
```
- Replace the `RecordingInMemoryTableCatalog.recordedAlters` references in
the assertions with `recordingCatalog.recordedAlters`.
- Drop both `RecordingInMemoryTableCatalog.reset()` calls in
`withRecordingCatalog` — the `catalogManager.reset()` in the `finally` already
clears the plugin cache, so the next block constructs a fresh instance with an
empty buffer — and update the "resets its shared alter buffer" line in that
method's scaladoc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]