The GitHub Actions job "Nightly Builds (1.7)" on pekko.git/main has failed.
Run started by GitHub user pjfanning (triggered by pjfanning).

Head commit for run:
415c4ae76024b2e4d1873efc9c028c9ea838757a / He-Pin(kerr) <[email protected]>
optimize: extend internalConcat dispatch for value-presented sources (#2978)

* optimize: extend internalConcat dispatch for value-presented sources

Motivation:
PR #2977 added value-presented source dispatch for FlattenConcat /
FlattenMerge so that materializing a substream is skipped when the inner
source already carries its element(s) inline. The same opportunity exists
for the lazy `concat` operator, where the second source is often a
`Source.single`, `Source.future`, an iterable, a range, a Java stream,
`Source.repeat`, or a failed source. Materializing a fan-in graph for any
of these is pure overhead.

`concat` (the eager / detached variant) intentionally retains the
substream-materializing path so that `Concat(_, detachedInputs = true)`
+ `Detacher` semantics — eager pull at materialization, one-element
prefetch buffer, deadlock-breaking for cyclic graphs — are preserved.

Modification:
- `Flow.internalConcat` (Scala DSL): when `detached = false` (i.e.
  `concatLazy`) and `TraversalBuilder.getValuePresentedSource` recognises
  a known kind, fuse with a dedicated lightweight stage instead of
  `Concat`. When `detached = true` (i.e. `concat`), fall through to
  `concatGraph` to keep the eager-pull / prefetch / cycle-deadlock
  contract intact. New stages live under `pekko.stream.impl`:
  `SingleConcat`, `IterableConcat`, `JavaStreamConcat`, `RepeatConcat`,
  `FailedConcat`, `FutureConcat`. `Source.empty` short-circuits to the
  upstream Source unchanged. The inlined source's wrapping attributes
  (from `Source.foo(...).withAttributes(...)`) are carried over to the
  optimized stage via `addAttributes(other.traversalBuilder.attributes)`
  so any user-supplied `SupervisionStrategy`, dispatcher hint, log level,
  or stage name still applies on the fast path.
- `FailedConcat` calls `failStage(failure)` from `preStart` to mirror
  `FailedSource.preStart` semantics. The existing `concatGraph` path
  materialises `FailedSource` as a substream that fails immediately and
  `Concat`'s `onUpstreamFailure` propagates eagerly; keeping the same
  eager-failure timing avoids hangs on inputs that never complete (e.g.
  `Source.never.concatLazy(Source.failed(ex))`).
- `JavaStreamConcat` opens the underlying `BaseStream` in `preStart` to
  match `JavaStreamSource.preStart`. Side effects of `open()` (file /
  network resource acquisition) and any exceptions from `open()` happen
  at materialization time, matching the existing `concatGraph` path. The
  stream is closed in `postStop` so cancellation, exhaustion, iterator
  failure, and stage failure all release the resource (`stream` typed
  `BaseStream[E, _]` to avoid a recursive bound on Scala 3, matching
  #2977's fix in `5d03002142`).
- `FutureConcat` registers the async callback in `preStart`, mirroring
  `FutureSource.preStart`: a pending-future failure surfaces eagerly via
  `failStage(ex)` even while upstream is still active (otherwise
  `Source.never.concatLazy(Source.future(failingFuture))` would hang). A
  successful value is buffered in `futureResult` and emitted only after
  `onUpstreamFinish`, preserving concat ordering. `Future.value` is used
  for the already-completed fast path. The pending path swaps the `out`
  handler to a no-op while waiting for the callback so we don't pull the
  now-closed `in`. `Success(null)` calls `completeStage()` (no null
  forwarded), staying in lock-step with `Source.future(Future.successful(null))`
  rerouting to `Source.empty` and `InflightSources.hasFutureElement`
  treating `Success(null)` as no-element. `DO NOT CHANGE` comments
  capture both invariants.
- `IterableConcat` reads 
`inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider`
  and applies it on iterator-creation / iterator-iteration failure,
  mirroring `IterableSource` / `IteratorSource` semantics. Without this,
  a Resume / Restart decider attached to the inlined source via
  `withAttributes(supervisionStrategy(...))` would silently degrade to
  Stop on the optimized path. Restart re-invokes the iterator factory and
  resumes emission with a fresh iterator.
- Tests: `FlowConcatSpec` covers each optimized dispatch (`SingleConcat`,
  `IterableConcat`, `IteratorConcat`, `JavaStreamConcat`, `RepeatConcat`,
  `FailedConcat`, `FutureConcat`), Java-stream `BaseStream.close()` on
  both exhaustion and downstream cancellation (`take(2)` mid-stream),
  pending future resolved with `Success(null)` treated as completion,
  pending future resolved with `Failure(_)` failing the stream, eager
  failure for `Source.never.concatLazy(Source.failed(ex))`, eager failure
  for `Source.never.concatLazy(Source.future(pendingPromise))` when the
  promise later fails, supervision parity for the `IterableConcat` fast
  path (Resume-skips, Restart-recreates, default-Stop-fails), and the
  detached-gating: a directional test asserts the fast path is not taken
  for `concat` (detached = true) and another asserts that `concat`
  preserves Detacher's eager pull side-effect timing for an
  `IteratorSource` factory while `concatLazy` defers it. Existing eager
  / concatLazy parity matrix is preserved by gating each
  `pendingBuilder.toString` assertion on `!eager`.

Result:
The lazy concat operator takes the same value-presented fast path as
#2977's flatten operators while the eager / detached `concat` keeps its
documented Detacher semantics. No substream is materialized for the
inlined kinds on the lazy path; the Java-stream resource is closed
deterministically; the failed / future eager-failure timing matches the
existing concat-graph path so inputs that never complete cannot hang;
the pending-future null path is consistent with `Source.future` and the
flatten inflight path; supervision deciders attached to inlined
iterables / iterators flow through to the optimized stage; and the
detached-true contract is preserved bit-for-bit by falling through to
`concatGraph`. All FlowConcat suites pass, plus all FlattenMerge /
FlatMapConcat regression tests.

Tests: stream-tests/testOnly *FlowConcat*Spec *Concat* *FlattenMerge* 
*FlatMapConcat*
References: PR #2977 (value-presented optimization for FlattenConcat /
FlattenMerge); PR #2978 (this work)

* .

Report URL: https://github.com/apache/pekko/actions/runs/26199489893

With regards,
GitHub Actions via GitBox


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to