He-Pin commented on code in PR #517:
URL:
https://github.com/apache/pekko-persistence-jdbc/pull/517#discussion_r3370869094
##########
core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala:
##########
@@ -42,62 +42,102 @@ trait BaseJournalDaoWithReadMessages extends
JournalDaoWithReadMessages {
}
/**
- * separate this method for unit tests.
+ * Separate this method for unit tests.
*/
@InternalApi
private[dao] def internalBatchStream(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long,
batchSize: Int,
- refreshInterval: Option[(FiniteDuration, Scheduler)]) = {
- val firstSequenceNr: Long = Math.max(1, fromSequenceNr)
+ refreshInterval: Option[(FiniteDuration, Scheduler)]
+ ): Source[Seq[Try[(PersistentRepr, Long)]], NotUsed] = {
+
+ val normalizedBatchSize = Math.max(1, batchSize)
+ val normalizedFromSequenceNr = Math.max(1, fromSequenceNr)
+
+ def pollOrComplete(from: Long): QueryPlan = refreshInterval match {
+ case Some((delay, scheduler)) => PollRemaining(from, delay, scheduler)
+ case None => Complete
+ }
+
+ // A windowed query spans [from, min(from + batchSize, toSequenceNr)]; at
full width that is
+ // batchSize + 1 sequence numbers, which lets a window tolerate one
missing sequence number
+ // and still return a full batch.
+ def windowedQuery(from: Long): QueryWindow = {
+ val endInclusive =
+ if (from <= (Long.MaxValue - normalizedBatchSize)) Math.min(from +
normalizedBatchSize, toSequenceNr)
+ else toSequenceNr // from + batchSize overflows; fall back to the full
remaining range
+ QueryWindow(from, endInclusive)
+ }
+
+ def retrieveBatch(from: Long, endInclusive: Long):
Future[Option[(QueryPlan, Seq[Try[(PersistentRepr, Long)]])]] =
+ messages(persistenceId, from, endInclusive,
normalizedBatchSize).runWith(Sink.seq).map { batch =>
+ // Messages are ordered by sequence number, therefore the last one is
the largest
+ val lastSeqNrInBatch: Option[Long] = batch.lastOption match {
+ case Some(Success((repr, _))) => Some(repr.sequenceNr)
+ case Some(Failure(cause)) => throw cause // fail the returned
Future
+ case None => None
+ }
+ val hasReachedToSequenceNr = lastSeqNrInBatch.exists(_ >= toSequenceNr)
+ val isFullBatch = batch.size == normalizedBatchSize
+ val wasWindowedQuery = endInclusive < toSequenceNr
+ // fromSequenceNr beyond toSequenceNr requests an empty range, which
must complete
+ // rather than poll a range that can never produce messages
+ val isEmptyRange = from > toSequenceNr
+ val nextFrom: Long = lastSeqNrInBatch.map(_ + 1).getOrElse(from)
+
+ // Deleted messages leave gaps, so a window may hold fewer live
messages than batchSize
+ // even when more messages exist beyond it. A short batch is
conclusive only when the
+ // queried range reached toSequenceNr, otherwise requery the full
remaining range.
+ val nextQueryPlan: QueryPlan =
+ if (hasReachedToSequenceNr || isEmptyRange) Complete
+ else if (isFullBatch) windowedQuery(nextFrom)
+ else if (wasWindowedQuery) QueryRemaining(nextFrom)
+ else pollOrComplete(nextFrom)
+
+ Some((nextQueryPlan, batch))
+ }
+
+ // A full-range first query crosses a leading gap (e.g. journal purged up
to a snapshot)
+ // in a single round trip; the LIMITed query costs the same as a windowed
one when dense.
+ // Once a full batch shows the journal to be dense, reads switch to
bounded windows, the
+ // perf safeguard introduced by PR #180, and fall back to a full-range
query on a gap.
Source
- .unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr,
Long)]]]((firstSequenceNr, Continue)) {
- case (from, control) =>
- def limitWindow(from: Long): Long = {
- if (from == firstSequenceNr || batchSize <= 0 || (Long.MaxValue -
batchSize) < from) {
- toSequenceNr
- } else {
- Math.min(from + batchSize, toSequenceNr)
- }
- }
-
- def retrieveNextBatch(): Future[Option[((Long, FlowControl),
Seq[Try[(PersistentRepr, Long)]])]] = {
- for {
- xs <- messages(persistenceId, from, limitWindow(from),
batchSize).runWith(Sink.seq)
- } yield {
- val hasMoreEvents = xs.size == batchSize
- // Events are ordered by sequence number, therefore the last one
is the largest)
- val lastSeqNrInBatch: Option[Long] = xs.lastOption match {
- case Some(Success((repr, _))) => Some(repr.sequenceNr)
- case Some(Failure(e)) => throw e // fail the returned
Future
- case None => None
- }
- val hasLastEvent = lastSeqNrInBatch.exists(_ >= toSequenceNr)
- val nextControl: FlowControl =
- if (hasLastEvent || from > toSequenceNr) Stop
- else if (hasMoreEvents) Continue
- else if (refreshInterval.isEmpty) Stop
- else ContinueDelayed
-
- val nextFrom: Long = lastSeqNrInBatch match {
- // Continue querying from the last sequence number (the events
are ordered)
- case Some(lastSeqNr) => lastSeqNr + 1
- case None => from
- }
- Some(((nextFrom, nextControl), xs))
- }
- }
-
- control match {
- case Stop => Future.successful(None)
- case Continue => retrieveNextBatch()
- case ContinueDelayed =>
- val (delay, scheduler) = refreshInterval.get
- pekko.pattern.after(delay, scheduler)(retrieveNextBatch())
- }
+ .unfoldAsync[QueryPlan, Seq[Try[(PersistentRepr,
Long)]]](QueryRemaining(normalizedFromSequenceNr)) {
+ case Complete =>
+ Future.successful(None)
+ case QueryWindow(from, endInclusive) =>
+ retrieveBatch(from, endInclusive)
+ case QueryRemaining(from) =>
+ retrieveBatch(from, toSequenceNr)
+ case PollRemaining(from, delay, scheduler) =>
+ pekko.pattern.after(delay, scheduler)(retrieveBatch(from,
toSequenceNr))
}
}
+}
+
+private[dao] object BaseJournalDaoWithReadMessages {
+
+ /** The query the batch stream will run. */
+ private sealed trait QueryPlan
Review Comment:
QueryAction?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]