andreaschat-db commented on code in PR #55462:
URL: https://github.com/apache/spark/pull/55462#discussion_r3225858027
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -3535,4 +3540,207 @@ class DataSourceV2DataFrameSuite
parameters = Map.empty)
}
}
+
+ private def externalAppend(
+ catalogName: String,
+ ident: Identifier,
+ schema: StructType,
+ row: InternalRow): Unit = {
+ val extTable = catalog(catalogName).loadTable(ident,
+ util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
+ extTable.withData(Array(
+ new BufferedRows(Seq.empty, schema).withRow(row)))
+ }
+
+ // Repeated table access with external changes (no CACHE TABLE).
+ // Each sql() call creates a fresh QueryExecution, so it always sees
+ // the latest data, schema, and table identity.
+
+ // Scenario 1: external writes
+
+ test("repeated sql() reflects session write") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+ checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))
+
+ sql(s"INSERT INTO $t VALUES (2, 200)")
+ checkAnswer(
+ sql(s"SELECT * FROM $t"),
+ Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ test("repeated sql() reflects external write") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+ checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))
+
+ // external writer adds (2, 200)
+ externalAppend(catalogName = "testcat", ident = ident,
+ schema = StructType.fromDDL("id INT, salary INT"), InternalRow(2, 200))
+
+ checkAnswer(
+ sql(s"SELECT * FROM $t"),
+ Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 1 connector w/ cache (external write, caching connector)
+ test("connector w/ cache: repeated sql() stale after external write") {
+ val t = "cachingcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+ checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))
+
+ // external writer adds (2, 200) via catalog API (bypasses cache)
+ externalAppend(catalogName = "cachingcat", ident = ident,
+ schema = StructType.fromDDL("id INT, salary INT"), InternalRow(2, 200))
+
+ // Caching connector returns stale table: external write invisible
+ checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, external write becomes
visible
+ sql(s"REFRESH TABLE $t")
+ checkAnswer(
+ sql(s"SELECT * FROM $t"),
+ Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 2: external schema changes
+
Review Comment:
nit: Remove empty line.
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -3535,4 +3540,207 @@ class DataSourceV2DataFrameSuite
parameters = Map.empty)
}
}
+
+ private def externalAppend(
+ catalogName: String,
+ ident: Identifier,
+ schema: StructType,
+ row: InternalRow): Unit = {
+ val extTable = catalog(catalogName).loadTable(ident,
+ util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
+ extTable.withData(Array(
+ new BufferedRows(Seq.empty, schema).withRow(row)))
+ }
+
+ // Repeated table access with external changes (no CACHE TABLE).
+ // Each sql() call creates a fresh QueryExecution, so it always sees
+ // the latest data, schema, and table identity.
+
+ // Scenario 1: external writes
+
+ test("repeated sql() reflects session write") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+ checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))
+
+ sql(s"INSERT INTO $t VALUES (2, 200)")
+ checkAnswer(
+ sql(s"SELECT * FROM $t"),
+ Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ test("repeated sql() reflects external write") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+ checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))
+
+ // external writer adds (2, 200)
+ externalAppend(catalogName = "testcat", ident = ident,
+ schema = StructType.fromDDL("id INT, salary INT"), InternalRow(2, 200))
+
+ checkAnswer(
+ sql(s"SELECT * FROM $t"),
+ Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 1 connector w/ cache (external write, caching connector)
+ test("connector w/ cache: repeated sql() stale after external write") {
+ val t = "cachingcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+ checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))
+
+ // external writer adds (2, 200) via catalog API (bypasses cache)
+ externalAppend(catalogName = "cachingcat", ident = ident,
+ schema = StructType.fromDDL("id INT, salary INT"), InternalRow(2, 200))
+
+ // Caching connector returns stale table: external write invisible
+ checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, external write becomes
visible
+ sql(s"REFRESH TABLE $t")
+ checkAnswer(
+ sql(s"SELECT * FROM $t"),
+ Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 2: external schema changes
+
+ test("repeated sql() reflects session schema change") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+ checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))
+
+ sql(s"ALTER TABLE $t ADD COLUMN new_col INT")
+ sql(s"INSERT INTO $t VALUES (2, 200, -1)")
+ checkAnswer(
+ sql(s"SELECT * FROM $t"),
+ Seq(Row(1, 100, null), Row(2, 200, -1)))
+ }
+ }
+
+ test("repeated sql() reflects external schema change") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+ checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))
+
+ // external schema change + data write via catalog API
+ val addCol = TableChange.addColumn(Array("new_col"), IntegerType, true)
+ catalog("testcat").alterTable(ident, addCol)
+
+ externalAppend(catalogName = "testcat", ident = ident,
+ schema = StructType.fromDDL("id INT, salary INT, new_col INT"),
InternalRow(2, 200, -1))
+
+ checkAnswer(
+ sql(s"SELECT * FROM $t"),
+ Seq(Row(1, 100, null), Row(2, 200, -1)))
+ }
+ }
+
+ // Scenario 2 connector w/ cache (external schema change, caching connector)
+ test("connector w/ cache: repeated sql() stale after external schema
change") {
+ val t = "cachingcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+ checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))
+
+ // external schema change + data via catalog API
+ val addCol = TableChange.addColumn(Array("new_col"), IntegerType, true)
+ catalog("cachingcat").alterTable(ident, addCol)
+
+ externalAppend(catalogName = "cachingcat", ident = ident,
+ schema = StructType.fromDDL("id INT, salary INT, new_col INT"),
InternalRow(2, 200, -1))
+
+ // Caching connector returns stale table: external changes invisible
+ checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, schema change + data
visible
+ sql(s"REFRESH TABLE $t")
+ checkAnswer(
+ sql(s"SELECT * FROM $t"),
+ Seq(Row(1, 100, null), Row(2, 200, -1)))
+ }
+ }
+
+ // Scenario 3: drop and recreate table
+
Review Comment:
nit: Remove empty line.
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -3535,4 +3540,207 @@ class DataSourceV2DataFrameSuite
parameters = Map.empty)
}
}
+
+ private def externalAppend(
+ catalogName: String,
+ ident: Identifier,
+ schema: StructType,
+ row: InternalRow): Unit = {
+ val extTable = catalog(catalogName).loadTable(ident,
+ util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
+ extTable.withData(Array(
+ new BufferedRows(Seq.empty, schema).withRow(row)))
+ }
+
+ // Repeated table access with external changes (no CACHE TABLE).
+ // Each sql() call creates a fresh QueryExecution, so it always sees
+ // the latest data, schema, and table identity.
+
+ // Scenario 1: external writes
+
Review Comment:
nit: Remove empty line.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]