The GitHub Actions job "Pull Requests" on pekko.git/optimize-stage-actor-ref-lazy-dispatch has failed. Run started by GitHub user He-Pin (triggered by He-Pin).
Head commit for run: 3ac22644f887b54c125a7ae14abc5b74bd267a01 / He-Pin <[email protected]> fix: optimize lazy stage actor dispatch via MPSC drain coalescing Motivation: Lazy `getStageActor` refs paid one actor mailbox enqueue per external tell: sender -> FunctionRef -> ConcurrentAsyncCallback.invokeWithPromise -> interpreter self ! AsyncInput. Under high tell rate to a single stage actor the bottleneck is mailbox traffic (envelope alloc, cross-thread wakeup, dequeue), not the dispatch lambda. Each tell also allocated a Tuple2, an AsyncInput, and a mailbox Envelope. Modification: Lazy `getStageActor` now installs an MPSC dispatch (`LazyDispatch`) that: - enqueues (sender, msg) into a Vyukov MPSC queue (`AbstractNodeQueue`) - elects a single drain via IDLE -> SCHEDULED CAS; only the elected producer pays a mailbox enqueue - drains on the interpreter thread in a tight loop bounded by `stage-actor-drain-batch` (default 16), then either publishes IDLE (with the canonical recheck race fix) or re-schedules another envelope so other BoundaryEvents interleave naturally via the actor mailbox - preserves `isStageCompleted` semantics: items added after completion are dropped exactly as the old per-tell path silently skipped them. The eager construction path (used before stream demand) is unchanged and still routes through the materializer supervisor + `AsyncCallback`. JIT/GC notes: - `LazyDispatch` is a `final class` and extends `AbstractNodeQueue` directly so it is its own queue (one fewer allocation and field deref). - `scheduledState` is a plain `@volatile var Int` driven by a static `VarHandle` (created via `MethodHandles.privateLookupIn`), avoiding the per-instance `AtomicBoolean` wrapper. Same pattern as `AbstractNodeQueue` itself. - The dispatch `apply` is monomorphic per StageActor instance; the drain callback is allocated once and reused. The FunctionRef lambda is rewritten as `(sender, msg) =>` to skip the Tuple2 allocation on the PoisonPill / Kill warning path. - Per-tell allocation is now 1 Node + 1 Tuple2 (the Tuple2 is forced by the public `StageActorRef.Receive` type); AsyncInput and Envelope are amortized across the batch. Configuration: `pekko.stream.materializer.stage-actor-drain-batch` (default 16) bounds the per-envelope drain. The default aligns with `InputBuffer.max` and keeps the per-actor-wakeup work in the same order of magnitude as the dispatcher throughput; smaller values trade tell throughput for tighter interleaving with upstream/downstream events, larger values do the opposite. Binary compatibility: The original 4-arg `private[pekko] StageActor` constructor (`materializer, getAsyncCallback, initialReceive, name`) is preserved as an auxiliary constructor and continues to use the eager `AsyncCallback` path. A new 5-arg `private[pekko]` constructor (`materializer, interpreter, logic, initialReceive, name`) is added for the lazy path. `sbt stream/mimaReportBinaryIssues` passes clean. Result: `StageActorRefBenchmark.lazy_stage_actor_ref_tell_10k` (JMH 2 forks x 10 iter x 2s, macOS) - throughput is now bounded by Vyukov enqueue + drain loop rather than per-tell mailbox traffic: | Variant | Throughput (ops/s) | vs main | |----------------------------------|----------------------|---------| | main | 6,587,561 +- 616,243 | 1.00x | | MPSC + drain coalescing (cap=16) | 13,044,829 +- 1,525K | 1.98x | | MPSC + drain coalescing (cap=8) | 13,589,612 +- 2,114K | 2.06x | BroadcastHubBenchmark is unchanged in this measurement (its bottleneck is fan-out broadcasting, not stage-actor tell traffic). Tests: - sbt "stream / compile" "stream / mimaReportBinaryIssues" - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.StageActorRefSpec" (11/11) - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.ActorRefSinkSpec org.apache.pekko.stream.scaladsl.ActorRefSourceSpec org.apache.pekko.stream.scaladsl.ActorRefBackpressureSinkSpec org.apache.pekko.stream.scaladsl.ActorRefBackpressureSourceSpec" (42/42) - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.QueueSinkSpec org.apache.pekko.stream.scaladsl.QueueSourceSpec org.apache.pekko.stream.scaladsl.HubSpec" (94/94) - sbt scalafmt headerCheck - sbt "bench-jmh / Jmh / run -i 10 -wi 5 -f 2 -r 2s -w 2s .*StageActorRefBenchmark.*" References: Refs https://github.com/akka/akka-core/issues/26857 (public issue only; clean-room implementation) Report URL: https://github.com/apache/pekko/actions/runs/26751148992 With regards, GitHub Actions via GitBox --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
