gengliangwang commented on code in PR #55653:
URL: https://github.com/apache/spark/pull/55653#discussion_r3176012299


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala:
##########
@@ -274,15 +274,54 @@ private class InMemoryChangelogOffset(val offset: Long) 
extends Offset {
 }
 
 /**
- * A [[MicroBatchStream]] that serves pre-populated change rows in a single 
batch.
+ * A [[MicroBatchStream]] that serves pre-populated change rows.
+ *
+ * Splits the rows into one micro-batch per commit by walking 
`_commit_version` and
+ * returning a strictly increasing `latestOffset()` per call -- mirroring how 
a real
+ * CDC source emits commits one at a time. Tests that rely on multi-batch 
behavior
+ * (watermark progression, append-mode aggregate eviction across batches, etc.)
+ * therefore actually exercise multiple batches; tests that just consume all 
rows
+ * still see the same final output via `processAllAvailable`.
  */
 private class InMemoryChangelogMicroBatchStream(
     schema: StructType,
     rows: Seq[InternalRow]) extends MicroBatchStream {
 
+  // Index of `_commit_version` in the row schema. The commit version column is
+  // produced by `InMemoryChangelog.cdcColumns` immediately after 
`_change_type`.
+  private val commitVersionIdx: Int = schema.fieldIndex("_commit_version")
+
+  // Last-row indices marking the end of each commit, in order. With rows
+  // [v1, v1, v2, v3, v3] this is [1, 2, 4]. Empty when `rows` is empty.
+  private val commitBoundaries: Array[Long] = {
+    val boundaries = scala.collection.mutable.ArrayBuffer.empty[Long]
+    var i = 0
+    while (i < rows.size) {
+      val v = rows(i).getLong(commitVersionIdx)
+      var j = i
+      while (j < rows.size && rows(j).getLong(commitVersionIdx) == v) j += 1
+      boundaries += (j - 1).toLong
+      i = j
+    }
+    boundaries.toArray
+  }
+
+  // Cursor into `commitBoundaries`. Advances one commit per call to
+  // `latestOffset()` until the last commit is reached, after which the
+  // returned offset is steady (idempotent steady state).
+  private var cursor: Int = 0
+
   override def initialOffset(): Offset = new InMemoryChangelogOffset(-1)
 
-  override def latestOffset(): Offset = new InMemoryChangelogOffset(rows.size 
- 1)
+  override def latestOffset(): Offset = {

Review Comment:
   Good point -- fixed in 9749df3 by switching the source to 
`SupportsAdmissionControl`:
   
   - `reportLatestOffset()` returns the final boundary (idempotent, computed 
once); this is what `processAllAvailable()` waits for, so the test driver 
blocks until every commit is committed.
   - `latestOffset(startOffset, limit)` returns the very next commit boundary 
strictly greater than `startOffset`; the engine uses this per-trigger to cap 
each micro-batch at one commit's worth of rows.
   
   Both methods are idempotent in their inputs -- no internal cursor, no 
per-call advancement -- so the engine can call them as many times as it wants 
before/after a commit without skipping commit boundaries.
   
   Also added an explicit assertion to the `streaming carry-over removal across 
multiple commits in separate batches` test that the query observed `>= 4` 
data-bearing micro-batches (one per input commit), so a regression that 
collapses the input into a single batch would be caught immediately rather than 
masked by the same final output.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to