The GitHub Actions job "Pull Requests" on 
pekko.git/optimize-stage-actor-ref-lazy-dispatch has failed.
Run started by GitHub user He-Pin (triggered by He-Pin).

Head commit for run:
a6c91c96faf4aeb890cf44709353585d89ce1377 / He-Pin <[email protected]>
fix: optimize lazy stage actor dispatch via MPSC drain coalescing

Motivation:

Lazy `getStageActor` refs paid one actor mailbox enqueue per external tell:
sender -> FunctionRef -> ConcurrentAsyncCallback.invokeWithPromise -> 
interpreter
self ! AsyncInput. Under high tell rate to a single stage actor the bottleneck
is mailbox traffic (envelope alloc, cross-thread wakeup, dequeue), not the
dispatch lambda. Each tell also allocated a Tuple2, an AsyncInput, and a
mailbox Envelope.

Modification:

Lazy `getStageActor` now installs an MPSC dispatch (`LazyDispatch`) that:
  - enqueues (sender, msg) into a Vyukov MPSC queue (`AbstractNodeQueue`)
  - elects a single drain via IDLE -> SCHEDULED CAS; only the elected
    producer pays a mailbox enqueue
  - drains on the interpreter thread in a tight loop bounded by
    `stage-actor-drain-batch` (default 16), then either publishes IDLE
    (with the canonical recheck race fix) or re-schedules another envelope
    so other BoundaryEvents interleave naturally via the actor mailbox
  - preserves `isStageCompleted` semantics: items added after completion are
    dropped exactly as the old per-tell path silently skipped them.

The eager construction path (used before stream demand) is unchanged and
still routes through the materializer supervisor + `AsyncCallback`.

JIT/GC notes:
  - `LazyDispatch` is a `final class` and extends `AbstractNodeQueue`
    directly so it is its own queue (one fewer allocation and field deref).
  - `scheduledState` is a plain `@volatile var Int` driven by a static
    `VarHandle` (created via `MethodHandles.privateLookupIn`), avoiding the
    per-instance `AtomicBoolean` wrapper. Same pattern as
    `AbstractNodeQueue` itself.
  - The dispatch `apply` is monomorphic per StageActor instance; the drain
    callback is allocated once and reused. The FunctionRef lambda is
    rewritten as `(sender, msg) =>` to skip the Tuple2 allocation on the
    PoisonPill / Kill warning path.
  - Per-tell allocation is now 1 Node + 1 Tuple2 (the Tuple2 is forced by
    the public `StageActorRef.Receive` type); AsyncInput and Envelope are
    amortized across the batch.

Configuration:

`pekko.stream.materializer.stage-actor-drain-batch` (default 16) bounds the
per-envelope drain. The default aligns with `InputBuffer.max` and keeps the
per-actor-wakeup work in the same order of magnitude as the dispatcher
throughput; smaller values trade tell throughput for tighter interleaving
with upstream/downstream events, larger values do the opposite.

Binary compatibility:

The original 4-arg `private[pekko] StageActor` constructor
(`materializer, getAsyncCallback, initialReceive, name`) is preserved as
an auxiliary constructor and continues to use the eager
`AsyncCallback` path. A new 5-arg `private[pekko]` constructor
(`materializer, interpreter, logic, initialReceive, name`) is added for the
lazy path. `sbt stream/mimaReportBinaryIssues` passes clean.

Result:

`StageActorRefBenchmark.lazy_stage_actor_ref_tell_10k` (JMH 2 forks x 10
iter x 2s, macOS) - throughput is now bounded by Vyukov enqueue + drain
loop rather than per-tell mailbox traffic:

| Variant                          | Throughput (ops/s)   | vs main |
|----------------------------------|----------------------|---------|
| main                             | 6,587,561 +- 616,243 | 1.00x   |
| MPSC + drain coalescing (cap=16) | 13,044,829 +- 1,525K | 1.98x   |
| MPSC + drain coalescing (cap=8)  | 13,589,612 +- 2,114K | 2.06x   |

BroadcastHubBenchmark is unchanged in this measurement (its bottleneck is
fan-out broadcasting, not stage-actor tell traffic).

Tests:

- sbt "stream / compile" "stream / mimaReportBinaryIssues"
- sbt "stream-tests / Test / testOnly 
org.apache.pekko.stream.scaladsl.StageActorRefSpec"
  (11/11)
- sbt "stream-tests / Test / testOnly
  org.apache.pekko.stream.scaladsl.ActorRefSinkSpec
  org.apache.pekko.stream.scaladsl.ActorRefSourceSpec
  org.apache.pekko.stream.scaladsl.ActorRefBackpressureSinkSpec
  org.apache.pekko.stream.scaladsl.ActorRefBackpressureSourceSpec" (42/42)
- sbt "stream-tests / Test / testOnly
  org.apache.pekko.stream.scaladsl.QueueSinkSpec
  org.apache.pekko.stream.scaladsl.QueueSourceSpec
  org.apache.pekko.stream.scaladsl.HubSpec" (94/94)
- sbt scalafmt headerCheck
- sbt "bench-jmh / Jmh / run -i 10 -wi 5 -f 2 -r 2s -w 2s
  .*StageActorRefBenchmark.*"

References:

Refs https://github.com/akka/akka-core/issues/26857 (public issue only;
clean-room implementation)

Report URL: https://github.com/apache/pekko/actions/runs/26752984433

With regards,
GitHub Actions via GitBox


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to