gengliangwang commented on code in PR #55653:
URL: https://github.com/apache/spark/pull/55653#discussion_r3176956567


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala:
##########
@@ -274,15 +274,54 @@ private class InMemoryChangelogOffset(val offset: Long) 
extends Offset {
 }
 
 /**
- * A [[MicroBatchStream]] that serves pre-populated change rows in a single 
batch.
+ * A [[MicroBatchStream]] that serves pre-populated change rows.
+ *
+ * Splits the rows into one micro-batch per commit by walking 
`_commit_version` and
+ * returning a strictly increasing `latestOffset()` per call -- mirroring how 
a real
+ * CDC source emits commits one at a time. Tests that rely on multi-batch 
behavior
+ * (watermark progression, append-mode aggregate eviction across batches, etc.)
+ * therefore actually exercise multiple batches; tests that just consume all 
rows
+ * still see the same final output via `processAllAvailable`.
  */
 private class InMemoryChangelogMicroBatchStream(
     schema: StructType,
     rows: Seq[InternalRow]) extends MicroBatchStream {
 
+  // Index of `_commit_version` in the row schema. The commit version column is
+  // produced by `InMemoryChangelog.cdcColumns` immediately after 
`_change_type`.
+  private val commitVersionIdx: Int = schema.fieldIndex("_commit_version")
+
+  // Last-row indices marking the end of each commit, in order. With rows
+  // [v1, v1, v2, v3, v3] this is [1, 2, 4]. Empty when `rows` is empty.
+  private val commitBoundaries: Array[Long] = {
+    val boundaries = scala.collection.mutable.ArrayBuffer.empty[Long]
+    var i = 0
+    while (i < rows.size) {
+      val v = rows(i).getLong(commitVersionIdx)
+      var j = i
+      while (j < rows.size && rows(j).getLong(commitVersionIdx) == v) j += 1
+      boundaries += (j - 1).toLong
+      i = j
+    }
+    boundaries.toArray
+  }
+
+  // Cursor into `commitBoundaries`. Advances one commit per call to
+  // `latestOffset()` until the last commit is reached, after which the
+  // returned offset is steady (idempotent steady state).
+  private var cursor: Int = 0
+
   override def initialOffset(): Offset = new InMemoryChangelogOffset(-1)
 
-  override def latestOffset(): Offset = new InMemoryChangelogOffset(rows.size 
- 1)
+  override def latestOffset(): Offset = {

Review Comment:
   Update: after rebasing onto current `master` (which includes #55637, the 
netChanges TransformWithState path), I had to revert the 
`SupportsAdmissionControl` multi-batch test infra introduced in 9749df3.
   
   Root cause: the netChanges processor emits via event-time timers fired at 
`_commit_timestamp`. With multi-batch on, when the last commit's rows arrive in 
the final micro-batch the watermark is advanced *to* that commit's timestamp at 
trigger start -- but a timer registered at exactly that timestamp does not fire 
(the engine only fires when `watermark > timer_ts`), and there is no subsequent 
batch to bump it further. In single-batch mode, the end-of-input flush emits 
every pending timer regardless, so the test passed.
   
   Rather than reshape the netChanges processor or watermark contract just to 
keep this test stub multi-batch, I reverted `InMemoryChangelogMicroBatchStream` 
back to its original single-batch behavior. The expanded E2E coverage in this 
follow-up (composite rowId, multi-commit, etc.) still exercises the row-level 
streaming rewrite end to end, just in a single batch.
   
   If we want true multi-batch verification down the road, the right place is 
probably a dedicated suite that uses `MemoryStream` (or similar) with explicit 
`processAllAvailable()` between commit groups, paired with a no-data trigger to 
drain the final commit's timers -- out of scope for this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to