He-Pin commented on code in PR #517:
URL:
https://github.com/apache/pekko-persistence-jdbc/pull/517#discussion_r3370855709
##########
core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala:
##########
@@ -42,62 +42,102 @@ trait BaseJournalDaoWithReadMessages extends
JournalDaoWithReadMessages {
}
/**
- * separate this method for unit tests.
+ * Separate this method for unit tests.
Review Comment:
**🟡 suggestion**: The window size changed from `batchSize` to `batchSize +
1` sequence numbers. This is correct — the extra slot lets a window tolerate
one gap and still return a full batch. However,
`LimitWindowingStreamTest.scala:77` calls `internalBatchStream` directly. The
old code produced windows of `batchSize` numbers (e.g., [101, 200]); the new
code produces `batchSize + 1` (e.g., [101, 201]). The test only checks
batch/message counts (not ranges), so it still passes for consecutive messages.
But if it's ever extended to check `queriedRanges`, it will fail. Consider
updating it or adding a comment.
##########
core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala:
##########
@@ -42,62 +42,102 @@ trait BaseJournalDaoWithReadMessages extends
JournalDaoWithReadMessages {
}
/**
- * separate this method for unit tests.
+ * Separate this method for unit tests.
*/
@InternalApi
private[dao] def internalBatchStream(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long,
batchSize: Int,
- refreshInterval: Option[(FiniteDuration, Scheduler)]) = {
- val firstSequenceNr: Long = Math.max(1, fromSequenceNr)
+ refreshInterval: Option[(FiniteDuration, Scheduler)]
+ ): Source[Seq[Try[(PersistentRepr, Long)]], NotUsed] = {
+
+ val normalizedBatchSize = Math.max(1, batchSize)
+ val normalizedFromSequenceNr = Math.max(1, fromSequenceNr)
+
+ def pollOrComplete(from: Long): QueryPlan = refreshInterval match {
+ case Some((delay, scheduler)) => PollRemaining(from, delay, scheduler)
Review Comment:
**✅ good**: Handling `from > toSequenceNr` as `isEmptyRange` correctly
prevents a live stream from polling a range that can never produce messages.
This edge case is important for the `fromSequenceNr > toSequenceNr` scenario
(tested in `MessagesWithBatchTest`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]