viirya commented on code in PR #55653:
URL: https://github.com/apache/spark/pull/55653#discussion_r3175623037
##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala:
##########
@@ -274,15 +274,54 @@ private class InMemoryChangelogOffset(val offset: Long)
extends Offset {
}
/**
- * A [[MicroBatchStream]] that serves pre-populated change rows in a single
batch.
+ * A [[MicroBatchStream]] that serves pre-populated change rows.
+ *
+ * Splits the rows into one micro-batch per commit by walking
`_commit_version` and
+ * returning a strictly increasing `latestOffset()` per call -- mirroring how
a real
+ * CDC source emits commits one at a time. Tests that rely on multi-batch
behavior
+ * (watermark progression, append-mode aggregate eviction across batches, etc.)
+ * therefore actually exercise multiple batches; tests that just consume all
rows
+ * still see the same final output via `processAllAvailable`.
*/
private class InMemoryChangelogMicroBatchStream(
schema: StructType,
rows: Seq[InternalRow]) extends MicroBatchStream {
+ // Index of `_commit_version` in the row schema. The commit version column is
+ // produced by `InMemoryChangelog.cdcColumns` immediately after
`_change_type`.
+ private val commitVersionIdx: Int = schema.fieldIndex("_commit_version")
+
+ // Last-row indices marking the end of each commit, in order. With rows
+ // [v1, v1, v2, v3, v3] this is [1, 2, 4]. Empty when `rows` is empty.
+ private val commitBoundaries: Array[Long] = {
+ val boundaries = scala.collection.mutable.ArrayBuffer.empty[Long]
+ var i = 0
+ while (i < rows.size) {
+ val v = rows(i).getLong(commitVersionIdx)
+ var j = i
+ while (j < rows.size && rows(j).getLong(commitVersionIdx) == v) j += 1
+ boundaries += (j - 1).toLong
+ i = j
+ }
+ boundaries.toArray
+ }
+
+ // Cursor into `commitBoundaries`. Advances one commit per call to
+ // `latestOffset()` until the last commit is reached, after which the
+ // returned offset is steady (idempotent steady state).
+ private var cursor: Int = 0
+
override def initialOffset(): Offset = new InMemoryChangelogOffset(-1)
- override def latestOffset(): Offset = new InMemoryChangelogOffset(rows.size
- 1)
+ override def latestOffset(): Offset = {
Review Comment:
One concern with the new test stream: `latestOffset()` advances `cursor` as
a side effect. That makes the available offset change on every offset lookup,
rather than on a committed/controlled source progression.
This seems a bit fragile because the correctness of “one commit per
micro-batch” now depends on the streaming engine calling `latestOffset()`
exactly once per trigger before planning/committing. If a future code path,
retry, restart behavior, or test helper calls `latestOffset()` more than once
before the previous offset is processed, the fake source could skip commit
boundaries and make the tests flaky or less representative.
Could we make `latestOffset()` idempotent for the current available offset,
and advance the cursor from `commit(end)` or another explicit test-controlled
progression point? Alternatively, adding an assertion that the query actually
processes multiple micro-batches would make the intended behavior much easier
to trust.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]