The GitHub Actions job "Pull Requests" on 
pekko.git/optimize-internal-concat-value-presented has failed.
Run started by GitHub user He-Pin (triggered by He-Pin).

Head commit for run:
fa3597a502764fdfbcf5baa1a2730f198470afbe / He-Pin <[email protected]>
optimize: extend internalConcat dispatch for value-presented sources

Motivation:
PR #2977 added value-presented source dispatch for FlattenConcat /
FlattenMerge so that materializing a substream is skipped when the inner
source already carries its element(s) inline. The same opportunity exists
for the eager / lazy `concat` operators, where the second source is often
a `Source.single`, `Source.future`, an iterable, a range, a Java stream,
`Source.repeat`, or a failed source. Materializing a fan-in graph for any
of these is pure overhead.

Modification:
- `Flow.internalConcat` (Scala DSL): when 
`TraversalBuilder.getValuePresentedSource`
  recognises a known kind, fuse with a dedicated lightweight stage instead
  of `Concat`. New stages live under `pekko.stream.impl`:
  `SingleConcat`, `IterableConcat`, `RangeConcat`, `JavaStreamConcat`,
  `RepeatConcat`, `FailedConcat`, `FutureConcat`. `Source.empty` short-
  circuits to the upstream Source unchanged.
- `FailedConcat` calls `failStage(failure)` from `preStart` to mirror
  `FailedSource.preStart` semantics. The existing `concatGraph` path
  materialises `FailedSource` as a substream that fails immediately and
  `Concat`'s `onUpstreamFailure` propagates eagerly; keeping the same
  eager-failure timing avoids hangs on inputs that never complete (e.g.
  `Source.never.concat(Source.failed(ex))`).
- `JavaStreamConcat` opens the underlying `BaseStream` in `preStart` to
  match `JavaStreamSource.preStart`. Side effects of `open()` (file /
  network resource acquisition) and any exceptions it throws happen at
  materialization time, matching the existing `concatGraph` path. The
  stream is closed in `postStop` so cancellation, exhaustion, iterator
  failure, and stage failure all release the resource (`stream` typed
  `BaseStream[E, _]` to avoid a recursive bound on Scala 3, matching
  #2977's fix in `5d03002142`).
- `FutureConcat` registers the async callback in `preStart`, mirroring
  `FutureSource.preStart`: a pending-future failure surfaces eagerly via
  `failStage(ex)` even while upstream is still active (otherwise
  `Source.never.concat(Source.future(failingFuture))` would hang). A
  successful value is buffered in `futureResult` and emitted only after
  `onUpstreamFinish`, preserving concat ordering. `Future.value` is used
  for the already-completed fast path. The pending path swaps the `out`
  handler to a no-op while waiting for the callback so we don't pull the
  now-closed `in`. `Success(null)` calls `completeStage()` (no null
  forwarded), staying in lock-step with `Source.future(Future.successful(null))`
  rerouting to `Source.empty` and `InflightSources.hasFutureElement`
  treating `Success(null)` as no-element. `DO NOT CHANGE` comments
  capture both invariants.
- Tests: `FlowConcatSpec` covers each optimized dispatch (`SingleConcat`,
  `IterableConcat`, `RangeConcat`, `IteratorConcat`, `JavaStreamConcat`,
  `RepeatConcat`, `FailedConcat`, `FutureConcat`), Java-stream
  `BaseStream.close()` on both exhaustion and downstream cancellation
  (`take(2)` mid-stream), pending future resolved with `Success(null)`
  treated as completion, pending future resolved with `Failure(_)`
  failing the stream, eager failure for 
`Source.never.concat(Source.failed(ex))`,
  eager failure for `Source.never.concat(Source.future(pendingPromise))`
  when the promise later fails, and the existing `eager` / `concatLazy`
  parity matrix.

Result:
The eager and lazy concat operators take the same value-presented fast
path as #2977's flatten operators. No substream is materialized for the
inlined kinds; the Java-stream resource is closed deterministically; the
failed / future eager-failure timing matches the existing concat-graph
path so inputs that never complete cannot hang; the pending-future null
path is consistent with `Source.future` and the flatten inflight path.
All FlowConcat suites pass, plus all FlattenMerge / FlatMapConcat
regression tests.

Tests: stream-tests/testOnly *FlowConcat*Spec *FlattenMerge* *FlatMapConcat*
References: PR #2977 (value-presented optimization for FlattenConcat /
FlattenMerge); PR #2978 (this work)

Report URL: https://github.com/apache/pekko/actions/runs/26144286530

With regards,
GitHub Actions via GitBox


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to