mucciolo opened a new issue, #516: URL: https://github.com/apache/pekko-persistence-jdbc/issues/516
## Summary `BaseJournalDaoWithReadMessages.internalBatchStream` stops the event stream prematurely when it encounters a gap in sequence numbers, silently skipping all remaining events. Gaps occur whenever events have been removed from the journal, either by deleting rows or by setting the `deleted` flag to `true`; both reproduce identically because both query layers filter `deleted = false` before applying the limit ([JournalQueries.scala#L103](https://github.com/apache/pekko-persistence-jdbc/blob/4da84ad/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalQueries.scala#L103), [ReadJournalQueries.scala#L38](https://github.com/apache/pekko-persistence-jdbc/blob/4da84ad/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalQueries.scala#L38)). ## Affected versions Introduced by #180 (commit ab2f6ce), partially mitigated by #195 (commit 7939510), which fixed only gaps at the head of the journal (the snapshot cleanup pattern) by exempting the first batch from windowing. Mid-stream gaps are broken in every release from v1.1.0-M1-RC1 through the 2.0.0 milestones and current `main` (4da84ad). Verified by running the reproduction suite below against ab2f6ce~1 (all pass), ab2f6ce (scenarios 3 to 7 fail), 7939510 and current main (scenarios 3 to 6 fail). ## Mechanism Before #180 every batch queried the full remaining range `[from, toSequenceNr]` with `take(batchSize)`, so a batch shorter than `batchSize` reliably meant the stream was exhausted. Since #180 every batch after the first is windowed to `[from, from + batchSize]` ([BaseJournalDaoWithReadMessages.scala#L58-L64](https://github.com/apache/pekko-persistence-jdbc/blob/4da84ad/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala#L58-L64)), but the exhaustion check was not updated: a window with fewer live events than `batchSize` sets `hasMoreEvents = false` ([line 70](https://github.com/apache/pekko-persistence-jdbc/blob/4da84ad/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala#L70)), and with `refreshInterval = None` the stream stops ([line 81](https://github.com/apache/pekko-persistence-jdbc/blob/4da84ad/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala#L81)) even though live events exist beyond the window. With a refresh interval, an empty window never advances `nextFrom` ([line 87](https://github.com/apache/pekko-persistence-jdbc/blob/4da84ad/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala#L87)), so the stream re-queries the same empty window instead of progressing. With batch size B, any non-first window of B + 1 consecutive sequence numbers containing 2 or more missing entries truncates the stream, so with the default `replayBatchSize = 400` ([reference.conf#L187](https://github.com/apache/pekko-persistence-jdbc/blob/4da84ad/core/src/main/resources/reference.conf#L187)) 2 missing events within any 401-number window are enough. The gap does not need to be wider than the batch size. ## Impact 1. **Actor recovery** (`JdbcAsyncWriteJournal.asyncReplayMessages`, [JdbcAsyncWriteJournal.scala#L126](https://github.com/apache/pekko-persistence-jdbc/blob/4da84ad/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/JdbcAsyncWriteJournal.scala#L126)): silent event loss, the actor recovers partial state with no error. 2. **`currentEventsByPersistenceId`** ([JdbcReadJournal.scala#L191](https://github.com/apache/pekko-persistence-jdbc/blob/4da84ad/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala#L191)): stream completes early, remaining events never emitted. 3. **Live `eventsByPersistenceId`**: events beyond the gap are not delivered. In the reproduction below (scenario 6, polling every 50 ms) the stream delivered event 1 and nothing further within a 5 second timeout. ## Reproduction Through `messagesWithBatch` on an H2-backed `DefaultJournalDao` (`fromSequenceNr = 1`, `toSequenceNr` = highest sequence number): | # | Live seqNrs | Deleted | batchSize | refreshInterval | Expected | ab2f6ce~1 | ab2f6ce (#180) | 7939510 (#195) and main | |---|---|---|---|---|---|---|---|---| | 1 | 1, 3 | 2 (hard) | 2 | None | 1, 3 | ok | ok | ok | | 2 | 1, 3 | 2 (hard) | 1 | None | 1, 3 | ok | ok | ok | | 3 | 1, 4 | 2, 3 (hard) | 1 | None | 1, 4 | ok | **1** | **1** | | 4 | 1, 4 | 2, 3 (soft) | 1 | None | 1, 4 | ok | **1** | **1** | | 5 | 1, 2, 4, 6 | 3, 5 (hard) | 2 | None | 1, 2, 4, 6 | ok | **1, 2, 4** | **1, 2, 4** | | 6 | 1, 4 | 2, 3 (hard) | 1 | 50 ms | 1, 4 | ok | **1, then nothing** | **1, then nothing** | | 7 | 3 | 1, 2 (hard) | 1 | None | 3 | ok | **nothing** | ok | Scenario 7 is the head-of-journal case that #195 fixed; it is included to document that #195 did not address gaps occurring after the first batch. The first batch is exempt from windowing, so all failures occur in a later window. ## Expected behavior The stream emits every live event in `[fromSequenceNr, toSequenceNr]` regardless of gaps, as it did before 1.1.0. A batch shorter than `batchSize` must not be treated as exhaustion unless the window has reached `toSequenceNr`. I have a test suite reproducing all seven scenarios above and intend to submit a PR with the fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
