He-Pin opened a new pull request, #3009: URL: https://github.com/apache/pekko/pull/3009
## Motivation `FlowMapAsyncPartitionedSpec` *"resume after multiple failures if resume supervision is in place"* intermittently hung for 60s and failed with a `ScalaFutures` timeout: ``` A timeout occurred waiting for a future to complete. Waited 60000000000 nanoseconds. (FlowMapAsyncPartitionedSpec.scala:398) ``` This is the same completion-path family as the `drainQueue` concurrent-modification race fixed in #2899 — see #2903. ## Modification `pushNextIfPossibleOrdered`'s non-empty-`partitionsInProgress` branch ended with only `drainQueue()`, never calling `pullIfNeeded()`. Inside its `while` loop, only the `Success` branch calls `pullIfNeeded()` (which holds the sole `completeStage()` check on the push path); the `Failure`/`Supervision.Resume` branch dequeues elements **without** any completion check. As a result, when the **final** buffered elements are resumed failures, the buffer empties while upstream has already finished, but `completeStage()` is never invoked — the stage hangs and the downstream future never completes. The fix adds a trailing `pullIfNeeded()` after `drainQueue()` in the ordered branch, mirroring `pushNextIfPossibleUnordered` which already had it. `pullIfNeeded()` is O(1), allocation-free and runs once per async callback (outside the per-element loop), so there is no hot-path or JIT impact. This explains why only this test triggered it: `resume after failed future` (1→5) ends with a success (element 5 → `Success` branch → completes), whereas `resume after multiple failures` (1→10) ends with elements 8/9/10 all failing (`%4 < 3`), so the last dequeue is a resumed failure. ## Result The stage now completes once the buffer drains after upstream finish, regardless of whether the last elements succeed or are resumed failures. ## Tests `sbt "stream-tests/testOnly org.apache.pekko.stream.scaladsl.FlowMapAsyncPartitionedSpec"` — previously timing-out test now passes in ~1ms; all 18 specs green across 5 consecutive runs. ``` - must resume after multiple failures if resume supervision is in place (1 millisecond) Tests: succeeded 18, failed 0, canceled 0, ignored 0, pending 0 ``` Internal API only (`@InternalApi private[stream]`); no public API / binary-compatibility impact. ## References - #2903 - #2899 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
