The GitHub Actions job "Pull Requests" on 
pekko.git/optimize-internal-concat-value-presented has succeeded.
Run started by GitHub user He-Pin (triggered by He-Pin).

Head commit for run:
dd8e3c6e2ad7b4e8a08cc5c53f082972416ad308 / He-Pin <[email protected]>
optimize: extend internalConcat dispatch for value-presented sources

Motivation:
PR #2977 added value-presented source dispatch for FlattenConcat /
FlattenMerge so that materializing a substream is skipped when the inner
source already carries its element(s) inline. The same opportunity exists
for the eager / lazy `concat` operators, where the second source is often
a `Source.single`, `Source.future`, an iterable, a range, a Java stream,
`Source.repeat`, or a failed source. Materializing a fan-in graph for any
of these is pure overhead.

Modification:
- `Flow.internalConcat` (Scala DSL): when 
`TraversalBuilder.getValuePresentedSource`
  recognises a known kind, fuse with a dedicated lightweight stage instead
  of `Concat`. New stages live under `pekko.stream.impl`:
  `SingleConcat`, `IterableConcat`, `RangeConcat`, `JavaStreamConcat`,
  `RepeatConcat`, `FailedConcat`, `FutureConcat`. `Source.empty` short-
  circuits to the upstream Source unchanged. The inlined source's wrapping
  attributes (from `Source.foo(...).withAttributes(...)`) are carried over
  to the optimized stage via `addAttributes(other.traversalBuilder.attributes)`
  so any user-supplied `SupervisionStrategy`, dispatcher hint, log level,
  or stage name still applies on the fast path — preserving behavioural
  parity with the substream-materializing concatGraph path.
- `FailedConcat` calls `failStage(failure)` from `preStart` to mirror
  `FailedSource.preStart` semantics. The existing `concatGraph` path
  materialises `FailedSource` as a substream that fails immediately and
  `Concat`'s `onUpstreamFailure` propagates eagerly; keeping the same
  eager-failure timing avoids hangs on inputs that never complete (e.g.
  `Source.never.concat(Source.failed(ex))`).
- `JavaStreamConcat` opens the underlying `BaseStream` in `preStart` to
  match `JavaStreamSource.preStart`. Side effects of `open()` (file /
  network resource acquisition) and any exceptions from `open()` happen
  at materialization time, matching the existing `concatGraph` path. The
  stream is closed in `postStop` so cancellation, exhaustion, iterator
  failure, and stage failure all release the resource (`stream` typed
  `BaseStream[E, _]` to avoid a recursive bound on Scala 3, matching
  #2977's fix in `5d03002142`).
- `FutureConcat` registers the async callback in `preStart`, mirroring
  `FutureSource.preStart`: a pending-future failure surfaces eagerly via
  `failStage(ex)` even while upstream is still active (otherwise
  `Source.never.concat(Source.future(failingFuture))` would hang). A
  successful value is buffered in `futureResult` and emitted only after
  `onUpstreamFinish`, preserving concat ordering. `Future.value` is used
  for the already-completed fast path. The pending path swaps the `out`
  handler to a no-op while waiting for the callback so we don't pull the
  now-closed `in`. `Success(null)` calls `completeStage()` (no null
  forwarded), staying in lock-step with `Source.future(Future.successful(null))`
  rerouting to `Source.empty` and `InflightSources.hasFutureElement`
  treating `Success(null)` as no-element. `DO NOT CHANGE` comments
  capture both invariants.
- `IterableConcat` reads 
`inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider`
  and applies it on iterator-creation / iterator-iteration failure,
  mirroring `IterableSource` / `IteratorSource` semantics. Without this,
  a Resume / Restart decider attached to the inlined source via
  `withAttributes(supervisionStrategy(...))` would silently degrade to
  Stop on the optimized path. Restart re-invokes the iterator factory and
  resumes emission with a fresh iterator.
- Tests: `FlowConcatSpec` covers each optimized dispatch (`SingleConcat`,
  `IterableConcat`, `RangeConcat`, `IteratorConcat`, `JavaStreamConcat`,
  `RepeatConcat`, `FailedConcat`, `FutureConcat`), Java-stream
  `BaseStream.close()` on both exhaustion and downstream cancellation
  (`take(2)` mid-stream), pending future resolved with `Success(null)`
  treated as completion, pending future resolved with `Failure(_)`
  failing the stream, eager failure for 
`Source.never.concat(Source.failed(ex))`,
  eager failure for `Source.never.concat(Source.future(pendingPromise))`
  when the promise later fails, and supervision parity for the
  `IterableConcat` fast path: Resume-skips-throwing-element, Restart-
  recreates-iterator-after-throw, and default-Stop-fails. Existing
  `eager` / `concatLazy` parity matrix is preserved.

Result:
The eager and lazy concat operators take the same value-presented fast
path as #2977's flatten operators. No substream is materialized for the
inlined kinds; the Java-stream resource is closed deterministically; the
failed / future eager-failure timing matches the existing concat-graph
path so inputs that never complete cannot hang; the pending-future null
path is consistent with `Source.future` and the flatten inflight path;
and supervision deciders attached to inlined iterables / iterators flow
through to the optimized stage, so 
`Source.fromIterator(...).withAttributes(...)`
behaves identically whether dispatched through this fast path or through
the substream path. All FlowConcat suites pass, plus all FlattenMerge /
FlatMapConcat regression tests.

Tests: stream-tests/testOnly *FlowConcat*Spec *FlattenMerge* *FlatMapConcat*
References: PR #2977 (value-presented optimization for FlattenConcat /
FlattenMerge); PR #2978 (this work)

Report URL: https://github.com/apache/pekko/actions/runs/26147429666

With regards,
GitHub Actions via GitBox


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to