viirya commented on code in PR #55636:
URL: https://github.com/apache/spark/pull/55636#discussion_r3171449505
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala:
##########
@@ -662,4 +663,143 @@ class ChangelogEndToEndSuite extends SharedSparkSession {
}
assert(e.getMessage.contains("changes"))
}
+
+ // ---------- Streaming: row-level post-processing ----------
+ //
+ // Streaming row-level passes (carry-over removal, update detection) rewrite
the plan
+ // into Aggregate(rowId, _commit_version, _commit_timestamp) -> [Filter] ->
+ // Generate(Inline(events)) -> [relabel Project], under an
EventTimeWatermark on
+ // _commit_timestamp.
+
+ /** Schema variant for post-processing tests: includes `row_commit_version`.
*/
+ private def recreateWithRowVersion(): Identifier = {
+ val id = ident
+ val cat = catalog
+ if (cat.tableExists(id)) cat.dropTable(id)
+ cat.createTable(
+ id,
+ Array(
+ Column.create("id", LongType, false),
+ Column.create("data", StringType),
+ Column.create("row_commit_version", LongType, false)),
+ Array.empty,
+ new util.HashMap[String, String]())
+ cat.clearChangeRows(id)
+ id
+ }
+
+ /** Row constructor for the row-version-enabled schema. */
+ private def ppRow(
+ id: Long,
+ data: String,
+ rcv: Long,
+ changeType: String,
+ commitVersion: Long,
+ commitTimestampMicros: Long): InternalRow = {
+ InternalRow(
+ id,
+ UTF8String.fromString(data),
+ rcv,
+ UTF8String.fromString(changeType),
+ commitVersion,
+ commitTimestampMicros)
+ }
+
+ test("streaming carry-over removal drops CoW pairs") {
+ val id = recreateWithRowVersion()
+ catalog.setChangelogProperties(id, ChangelogProperties(
+ containsCarryoverRows = true,
+ rowIdNames = Seq("id"),
+ rowVersionName = Some("row_commit_version")))
+
+ catalog.addChangeRows(id, Seq(
+ // v1: insert Alice (rcv=1), Bob (rcv=1)
+ ppRow(1L, "Alice", 1L, CHANGE_TYPE_INSERT, 1L, 1000000L),
+ ppRow(2L, "Bob", 1L, CHANGE_TYPE_INSERT, 1L, 1000000L),
+ // v2: real delete Alice + carry-over for Bob (rcv unchanged)
+ ppRow(1L, "Alice", 1L, CHANGE_TYPE_DELETE, 2L, 2000000L),
+ ppRow(2L, "Bob", 1L, CHANGE_TYPE_DELETE, 2L, 2000000L),
+ ppRow(2L, "Bob", 1L, CHANGE_TYPE_INSERT, 2L, 2000000L)))
+
+ val q = spark.readStream
+ .option("startingVersion", "1")
+ .changes(fullTableName)
+ .select("id", "data", "_change_type", "_commit_version")
+ .writeStream
+ .format("memory")
+ .queryName("cdc_stream_carryover")
+ .outputMode("append")
Review Comment:
Can we also add tests for update/complete mode?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]