mucciolo opened a new issue, #516:
URL: https://github.com/apache/pekko-persistence-jdbc/issues/516

   ## Summary
   
   `BaseJournalDaoWithReadMessages.internalBatchStream` stops the event stream 
prematurely when it
   encounters a gap in sequence numbers, silently skipping all remaining 
events. Gaps occur whenever
   events have been removed from the journal, either by deleting rows or by 
setting the `deleted`
   flag to `true`; both reproduce identically because both query layers filter 
`deleted = false`
   before applying the limit
   
([JournalQueries.scala#L103](https://github.com/apache/pekko-persistence-jdbc/blob/4da84ad/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalQueries.scala#L103),
   
[ReadJournalQueries.scala#L38](https://github.com/apache/pekko-persistence-jdbc/blob/4da84ad/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalQueries.scala#L38)).
   
   ## Affected versions
   
   Introduced by #180 (commit ab2f6ce), partially mitigated by #195 (commit 
7939510), which fixed
   only gaps at the head of the journal (the snapshot cleanup pattern) by 
exempting the first batch
   from windowing. Mid-stream gaps are broken in every release from 
v1.1.0-M1-RC1 through the 2.0.0
   milestones and current `main` (4da84ad). Verified by running the 
reproduction suite below against
   ab2f6ce~1 (all pass), ab2f6ce (scenarios 3 to 7 fail), 7939510 and current 
main (scenarios 3 to 6
   fail).
   
   ## Mechanism
   
   Before #180 every batch queried the full remaining range `[from, 
toSequenceNr]` with
   `take(batchSize)`, so a batch shorter than `batchSize` reliably meant the 
stream was exhausted.
   Since #180 every batch after the first is windowed to `[from, from + 
batchSize]`
   
([BaseJournalDaoWithReadMessages.scala#L58-L64](https://github.com/apache/pekko-persistence-jdbc/blob/4da84ad/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala#L58-L64)),
   but the exhaustion check was not updated: a window with fewer live events 
than `batchSize` sets
   `hasMoreEvents = false`
   ([line 
70](https://github.com/apache/pekko-persistence-jdbc/blob/4da84ad/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala#L70)),
   and with `refreshInterval = None` the stream stops
   ([line 
81](https://github.com/apache/pekko-persistence-jdbc/blob/4da84ad/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala#L81))
   even though live events exist beyond the window. With a refresh interval, an 
empty window never
   advances `nextFrom`
   ([line 
87](https://github.com/apache/pekko-persistence-jdbc/blob/4da84ad/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala#L87)),
   so the stream re-queries the same empty window instead of progressing.
   
   With batch size B, any non-first window of B + 1 consecutive sequence 
numbers containing 2 or
   more missing entries truncates the stream, so with the default 
`replayBatchSize = 400`
   
([reference.conf#L187](https://github.com/apache/pekko-persistence-jdbc/blob/4da84ad/core/src/main/resources/reference.conf#L187))
   2 missing events within any 401-number window are enough. The gap does not 
need to be wider than
   the batch size.
   
   ## Impact
   
   1. **Actor recovery** (`JdbcAsyncWriteJournal.asyncReplayMessages`,
      
[JdbcAsyncWriteJournal.scala#L126](https://github.com/apache/pekko-persistence-jdbc/blob/4da84ad/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/JdbcAsyncWriteJournal.scala#L126)):
      silent event loss, the actor recovers partial state with no error.
   2. **`currentEventsByPersistenceId`**
      
([JdbcReadJournal.scala#L191](https://github.com/apache/pekko-persistence-jdbc/blob/4da84ad/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala#L191)):
      stream completes early, remaining events never emitted.
   3. **Live `eventsByPersistenceId`**: events beyond the gap are not 
delivered. In the reproduction
      below (scenario 6, polling every 50 ms) the stream delivered event 1 and 
nothing further
      within a 5 second timeout.
   
   ## Reproduction
   
   Through `messagesWithBatch` on an H2-backed `DefaultJournalDao` 
(`fromSequenceNr = 1`,
   `toSequenceNr` = highest sequence number):
   
   | # | Live seqNrs | Deleted | batchSize | refreshInterval | Expected | 
ab2f6ce~1 | ab2f6ce (#180) | 7939510 (#195) and main |
   |---|---|---|---|---|---|---|---|---|
   | 1 | 1, 3 | 2 (hard) | 2 | None | 1, 3 | ok | ok | ok |
   | 2 | 1, 3 | 2 (hard) | 1 | None | 1, 3 | ok | ok | ok |
   | 3 | 1, 4 | 2, 3 (hard) | 1 | None | 1, 4 | ok | **1** | **1** |
   | 4 | 1, 4 | 2, 3 (soft) | 1 | None | 1, 4 | ok | **1** | **1** |
   | 5 | 1, 2, 4, 6 | 3, 5 (hard) | 2 | None | 1, 2, 4, 6 | ok | **1, 2, 4** | 
**1, 2, 4** |
   | 6 | 1, 4 | 2, 3 (hard) | 1 | 50 ms | 1, 4 | ok | **1, then nothing** | 
**1, then nothing** |
   | 7 | 3 | 1, 2 (hard) | 1 | None | 3 | ok | **nothing** | ok |
   
   Scenario 7 is the head-of-journal case that #195 fixed; it is included to 
document that #195 did
   not address gaps occurring after the first batch. The first batch is exempt 
from windowing, so
   all failures occur in a later window.
   
   ## Expected behavior
   
   The stream emits every live event in `[fromSequenceNr, toSequenceNr]` 
regardless of gaps, as it
   did before 1.1.0. A batch shorter than `batchSize` must not be treated as 
exhaustion unless the
   window has reached `toSequenceNr`.
   
   I have a test suite reproducing all seven scenarios above and intend to 
submit a PR with the fix.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to