szehon-ho commented on code in PR #56684:
URL: https://github.com/apache/spark/pull/56684#discussion_r3507912229


##########
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala:
##########
@@ -1083,4 +1091,156 @@ abstract class MaterializeTablesSuite extends 
BaseCoreExecutionTest {
     }
     assert(ex.cause.isInstanceOf[AnalysisException])
   }
+
+  // =============== Table evolution in catalog tests ===============
+
+  private val recordingCatalogName = "recording_cat"
+  private val recordingNamespace = "rec_ns"
+
+  /**
+   * Registers [[RecordingInMemoryTableCatalog]] under `recordingCatalogName`, 
resets its shared
+   * alter buffer and any cached catalog state, creates `recordingNamespace`, 
runs `body`, then
+   * tears the registration back down.
+   */
+  private def withRecordingCatalog(body: => Unit): Unit = {
+    spark.conf.set(
+      s"spark.sql.catalog.$recordingCatalogName",
+      classOf[RecordingInMemoryTableCatalog].getName
+    )
+    RecordingInMemoryTableCatalog.reset()
+    try {
+      spark.sql(s"CREATE NAMESPACE IF NOT EXISTS 
$recordingCatalogName.$recordingNamespace")
+      body
+    } finally {
+      RecordingInMemoryTableCatalog.reset()
+      spark.sessionState.catalogManager.reset()
+      
spark.sessionState.conf.unsetConf(s"spark.sql.catalog.$recordingCatalogName")
+    }
+  }
+
+  /**
+   * Materializes a single streaming table under the recording 
catalog/namespace with the given
+   * schema and properties.
+   */
+  private def materializeStreamingTable(
+      name: String,
+      schema: StructType,
+      properties: Map[String, String]): Unit = {
+    // All nulls dummy row, compatible with any schema type
+    val row = Row.fromSeq(Seq.fill(schema.length)(null))
+    val df = spark.createDataFrame(spark.sparkContext.parallelize(Seq(row)), 
schema)
+    materializeGraph(
+      new TestGraphRegistrationContext(spark) {
+        registerTable(
+          name,
+          query = Option(dfFlowFunc(df)),
+          specifiedSchema = Option(schema),
+          properties = properties,
+          catalog = Option(recordingCatalogName),
+          database = Option(recordingNamespace)
+        )
+      }.resolveToDataflowGraph(),
+      storageRoot = storageRoot
+    )
+  }
+
+  private def loadTableFromRecordingCatalog(name: String): V2Table = {
+    val catalog = spark.sessionState.catalogManager
+      .catalog(recordingCatalogName)
+      .asInstanceOf[TableCatalog]
+    catalog.loadTable(Identifier.of(Array(recordingNamespace), name))
+  }
+
+  test("re-materializing an unchanged table does not issue an alterTable") {
+    withRecordingCatalog {
+      val schema = new StructType().add("id", IntegerType).add("value", 
StringType)
+      val props = Map("p.a" -> "1", "p.b" -> "2")
+      // Creating the table issues no alter, and re-materializing the 
unchanged table is a no-op,
+      // so no alter is ever recorded.
+      materializeStreamingTable("t", schema, props)
+      assert(RecordingInMemoryTableCatalog.recordedAlters.isEmpty)
+      materializeStreamingTable("t", schema, props)
+      assert(RecordingInMemoryTableCatalog.recordedAlters.isEmpty)
+    }
+  }
+
+  test("re-materializing with changed/new properties issues an alterTable that 
sets them") {
+    withRecordingCatalog {
+      val schema = new StructType().add("id", IntegerType).add("value", 
StringType)
+      // Creating the table issues no alter; re-materializing with 
changed/added properties issues
+      // exactly one alter that sets them.
+      materializeStreamingTable("t", schema, Map("p.a" -> "1"))
+      assert(RecordingInMemoryTableCatalog.recordedAlters.isEmpty)
+      materializeStreamingTable("t", schema, Map("p.a" -> "2", "p.new" -> "n"))
+      assert(RecordingInMemoryTableCatalog.recordedAlters.size == 1)
+
+      val changes = RecordingInMemoryTableCatalog.recordedAlters.flatten
+      assert(changes.forall(_.isInstanceOf[TableChange.SetProperty]))
+      val set = changes.collect {
+        case s: TableChange.SetProperty => s.property() -> s.value()
+      }.toMap
+      assert(set == Map("p.a" -> "2", "p.new" -> "n"))
+
+      val table = loadTableFromRecordingCatalog("t")
+      assert(table.properties().get("p.a") == "2")
+      assert(table.properties().get("p.new") == "n")
+    }
+  }
+
+  test("re-materializing with an added column issues an alterTable") {
+    withRecordingCatalog {
+      // Creating the table issues no alter; re-materializing with an added 
column issues exactly
+      // one alter that adds it.
+      materializeStreamingTable("t", new StructType().add("id", IntegerType), 
Map("p.a" -> "1"))
+      assert(RecordingInMemoryTableCatalog.recordedAlters.isEmpty)
+      materializeStreamingTable(
+        "t",
+        new StructType().add("id", IntegerType).add("value", StringType),
+        Map("p.a" -> "1")
+      )
+      assert(RecordingInMemoryTableCatalog.recordedAlters.size == 1)
+
+      val changes = RecordingInMemoryTableCatalog.recordedAlters.flatten
+      assert(changes.exists(_.isInstanceOf[TableChange.AddColumn]))
+
+      assert(
+        loadTableFromRecordingCatalog("t").columns() sameElements
+          CatalogV2Util.structTypeToV2Columns(
+            new StructType().add("id", IntegerType).add("value", StringType)
+          )
+      )
+    }
+  }
+
+  test("re-materializing with a dropped property neither removes it nor issues 
an alterTable") {
+    withRecordingCatalog {
+      val schema = new StructType().add("id", IntegerType)
+      // This test locks in the current buggy behavior where dropped 
properties do not materialize
+      // against the catalog table entity. See SPARK-57670.
+      materializeStreamingTable("t", schema, Map("p.keep" -> "v", "p.stale" -> 
"old"))
+      assert(RecordingInMemoryTableCatalog.recordedAlters.isEmpty)
+      materializeStreamingTable("t", schema, Map("p.keep" -> "v"))
+      assert(RecordingInMemoryTableCatalog.recordedAlters.isEmpty)
+
+      assert(loadTableFromRecordingCatalog("t").properties().get("p.stale") == 
"old")
+    }
+  }
+}
+
+/**
+ * An [[InMemoryTableCatalog]] that records every `alterTable` invocation 
(into the shared companion
+ * buffer) while still applying it, so tests can assert whether 
materialization issued an alter or
+ * skipped it as a no-op.
+ */
+class RecordingInMemoryTableCatalog extends InMemoryTableCatalog {
+  override def alterTable(ident: Identifier, changes: TableChange*): V2Table = 
{
+    RecordingInMemoryTableCatalog.recordedAlters += changes.toSeq
+    super.alterTable(ident, changes: _*)
+  }
+}
+
+object RecordingInMemoryTableCatalog {
+  val recordedAlters: mutable.ArrayBuffer[Seq[TableChange]] =
+    mutable.ArrayBuffer.empty
+  def reset(): Unit = recordedAlters.clear()
 }

Review Comment:
   `recordedAlters` is global companion-object state, which only stays safe as 
long as nothing that uses this class ever runs concurrently in the same JVM. 
Since `CatalogManager` caches plugins by name (`catalogs.getOrElseUpdate(name, 
Catalogs.load(name, conf))`), every access within a `withRecordingCatalog` 
block resolves to the *same* instance, so the buffer can be per-instance and 
read back off the cached catalog. That removes the global state (and the need 
for `reset()`) entirely:
   
   ```suggestion
   /**
    * An [[InMemoryTableCatalog]] that records every `alterTable` invocation 
(into a per-instance
    * buffer) while still applying it, so tests can assert whether 
materialization issued an alter or
    * skipped it as a no-op. `CatalogManager` caches catalog plugins by name, 
so the same instance
    * backs every access within a `withRecordingCatalog` block and reads go 
through that cached instance.
    */
   class RecordingInMemoryTableCatalog extends InMemoryTableCatalog {
     val recordedAlters: mutable.ArrayBuffer[Seq[TableChange]] = 
mutable.ArrayBuffer.empty
   
     override def alterTable(ident: Identifier, changes: TableChange*): V2Table 
= {
       recordedAlters += changes.toSeq
       super.alterTable(ident, changes: _*)
     }
   }
   ```
   
   This needs a few matching changes on the test side:
   
   - Add an accessor next to `loadTableFromRecordingCatalog`:
     ```scala
     private def recordingCatalog: RecordingInMemoryTableCatalog =
       spark.sessionState.catalogManager
         .catalog(recordingCatalogName)
         .asInstanceOf[RecordingInMemoryTableCatalog]
     ```
   - Replace the `RecordingInMemoryTableCatalog.recordedAlters` references in 
the assertions with `recordingCatalog.recordedAlters`.
   - Drop both `RecordingInMemoryTableCatalog.reset()` calls in 
`withRecordingCatalog` — the `catalogManager.reset()` in the `finally` already 
clears the plugin cache, so the next block constructs a fresh instance with an 
empty buffer — and update the "resets its shared alter buffer" line in that 
method's scaladoc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to