He-Pin commented on code in PR #517:
URL: 
https://github.com/apache/pekko-persistence-jdbc/pull/517#discussion_r3370855709


##########
core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala:
##########
@@ -42,62 +42,102 @@ trait BaseJournalDaoWithReadMessages extends 
JournalDaoWithReadMessages {
   }
 
   /**
-   * separate this method for unit tests.
+   * Separate this method for unit tests.

Review Comment:
   **🟡 suggestion**: The window size changed from `batchSize` to `batchSize + 
1` sequence numbers. This is correct — the extra slot lets a window tolerate 
one gap and still return a full batch. However, 
`LimitWindowingStreamTest.scala:77` calls `internalBatchStream` directly. The 
old code produced windows of `batchSize` numbers (e.g., [101, 200]); the new 
code produces `batchSize + 1` (e.g., [101, 201]). The test only checks 
batch/message counts (not ranges), so it still passes for consecutive messages. 
But if it's ever extended to check `queriedRanges`, it will fail. Consider 
updating it or adding a comment.



##########
core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala:
##########
@@ -42,62 +42,102 @@ trait BaseJournalDaoWithReadMessages extends 
JournalDaoWithReadMessages {
   }
 
   /**
-   * separate this method for unit tests.
+   * Separate this method for unit tests.
    */
   @InternalApi
   private[dao] def internalBatchStream(
       persistenceId: String,
       fromSequenceNr: Long,
       toSequenceNr: Long,
       batchSize: Int,
-      refreshInterval: Option[(FiniteDuration, Scheduler)]) = {
-    val firstSequenceNr: Long = Math.max(1, fromSequenceNr)
+      refreshInterval: Option[(FiniteDuration, Scheduler)]
+  ): Source[Seq[Try[(PersistentRepr, Long)]], NotUsed] = {
+
+    val normalizedBatchSize = Math.max(1, batchSize)
+    val normalizedFromSequenceNr = Math.max(1, fromSequenceNr)
+
+    def pollOrComplete(from: Long): QueryPlan = refreshInterval match {
+      case Some((delay, scheduler)) => PollRemaining(from, delay, scheduler)

Review Comment:
   **✅ good**: Handling `from > toSequenceNr` as `isEmptyRange` correctly 
prevents a live stream from polling a range that can never produce messages. 
This edge case is important for the `fromSequenceNr > toSequenceNr` scenario 
(tested in `MessagesWithBatchTest`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to