pjfanning commented on code in PR #3035:
URL: https://github.com/apache/pekko/pull/3035#discussion_r3347850380
##########
stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala:
##########
@@ -275,6 +309,152 @@ object GraphStageLogic {
type Receive = ((ActorRef, Any)) => Unit
}
+ private object StageActor {
+ def localCell(ref: ActorRef, description: String): ActorCell =
+ ref match {
+ case ref: LocalActorRef => ref.underlying
+ case ref: RepointableActorRef =>
+ ref.underlying match {
+ case cell: ActorCell => cell
+ case unknown =>
+ throw new IllegalStateException(s"$description must be a local
actor, was [${unknown.getClass.getName}]")
+ }
+ case unknown =>
+ throw new IllegalStateException(s"$description must be a local
actor, was [${unknown.getClass.getName}]")
+ }
+
+ /**
+ * Reads `pekko.stream.materializer.stage-actor-drain-batch` from the
materializer's ActorSystem config.
+ * Called once per lazy StageActor construction (never on the hot path).
Bounded to `>= 1`.
+ */
+ def drainBatchSize(materializer: Materializer): Int =
+ Math.max(1,
materializer.system.settings.config.getInt("pekko.stream.materializer.stage-actor-drain-batch"))
+
+ private final val SchedStateIdle: Int = 0
+ private final val SchedStateScheduled: Int = 1
+
+ /**
+ * Lazy-path dispatch: producers enqueue into a Vyukov MPSC queue and
elect a single drain via
+ * IDLE -> SCHEDULED CAS; only the elected producer pays a mailbox
enqueue. The drain runs on the
+ * interpreter thread, polls in a tight loop bounded by `drainBatchSize`,
then either publishes IDLE
+ * (with a recheck for the publish-window race) or re-schedules another
envelope to yield to other
+ * BoundaryEvents.
+ *
+ * JIT/GC notes:
+ * - `final class` + monomorphic per-StageActor instance → JIT
devirtualizes the apply at the
+ * FunctionRef call site.
+ * - Extends `AbstractNodeQueue` directly so the queue head atomic and
the dispatch function share one
+ * object (one allocation per StageActor, one fewer field deref on the
producer hot path).
+ * - Implements `Any => Unit` directly — serves as both the producer
callback and the drain callback,
+ * eliminating the separate `drainCallback` lambda allocation.
+ * - `state` is a plain `@volatile var Int` driven by a static
`VarHandle` in the companion object
+ * (via `MethodHandles.privateLookupIn`), same pattern as
`AbstractNodeQueue` itself;
+ * avoids per-instance `AtomicInteger`.
+ * - `drainBatchSize` is read once into a stack-local at the top of
`drain` so the JIT can treat the loop
+ * bound as a constant.
+ * - Per-tell allocation = 1 Node (`AbstractNodeQueue.Node`, ~24 bytes) +
1 Tuple2 (~24 bytes). The
+ * Tuple2 is forced by the public `StageActorRef.Receive` type. No
AsyncInput / Envelope per tell —
+ * those are amortized across the batch.
+ */
+ // Not marked `private` so that `class StageActor`'s aux constructor
(compiled outside of the companion
+ // object on Scala 3) can reference it; the enclosing `object StageActor`
is itself private.
+ final class LazyDispatch(
+ interpreter: GraphInterpreter,
+ logic: GraphStageLogic,
+ handler: Any => Unit,
+ drainBatchSize: Int)
+ extends AbstractNodeQueue[(ActorRef, Any)]
+ with (Any => Unit) {
+
+ // IDLE/SCHEDULED election state. VarHandle avoids per-instance
AtomicInteger;
+ // the handle lives in the companion object (true JVM static), same
pattern as
+ // AbstractNodeQueue._tailDoNotCallMeDirectly.
+ @volatile var state: Int = SchedStateIdle
+
+ override def apply(msg: Any): Unit = {
+ // Drain path: onAsyncInput passes null as the message when scheduling
a drain envelope.
+ if (msg.asInstanceOf[AnyRef] eq null) { drain(); return }
Review Comment:
why do you need to cast before checking `eq null`? can you also avoid `;`
and just use new lines? can you avoid return here by using an `else` on the
`if`.
##########
stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala:
##########
@@ -275,6 +309,152 @@ object GraphStageLogic {
type Receive = ((ActorRef, Any)) => Unit
}
+ private object StageActor {
+ def localCell(ref: ActorRef, description: String): ActorCell =
+ ref match {
+ case ref: LocalActorRef => ref.underlying
+ case ref: RepointableActorRef =>
+ ref.underlying match {
+ case cell: ActorCell => cell
+ case unknown =>
+ throw new IllegalStateException(s"$description must be a local
actor, was [${unknown.getClass.getName}]")
+ }
+ case unknown =>
+ throw new IllegalStateException(s"$description must be a local
actor, was [${unknown.getClass.getName}]")
+ }
+
+ /**
+ * Reads `pekko.stream.materializer.stage-actor-drain-batch` from the
materializer's ActorSystem config.
+ * Called once per lazy StageActor construction (never on the hot path).
Bounded to `>= 1`.
+ */
+ def drainBatchSize(materializer: Materializer): Int =
+ Math.max(1,
materializer.system.settings.config.getInt("pekko.stream.materializer.stage-actor-drain-batch"))
+
+ private final val SchedStateIdle: Int = 0
+ private final val SchedStateScheduled: Int = 1
+
+ /**
+ * Lazy-path dispatch: producers enqueue into a Vyukov MPSC queue and
elect a single drain via
+ * IDLE -> SCHEDULED CAS; only the elected producer pays a mailbox
enqueue. The drain runs on the
+ * interpreter thread, polls in a tight loop bounded by `drainBatchSize`,
then either publishes IDLE
+ * (with a recheck for the publish-window race) or re-schedules another
envelope to yield to other
+ * BoundaryEvents.
+ *
+ * JIT/GC notes:
+ * - `final class` + monomorphic per-StageActor instance → JIT
devirtualizes the apply at the
+ * FunctionRef call site.
+ * - Extends `AbstractNodeQueue` directly so the queue head atomic and
the dispatch function share one
+ * object (one allocation per StageActor, one fewer field deref on the
producer hot path).
+ * - Implements `Any => Unit` directly — serves as both the producer
callback and the drain callback,
+ * eliminating the separate `drainCallback` lambda allocation.
+ * - `state` is a plain `@volatile var Int` driven by a static
`VarHandle` in the companion object
+ * (via `MethodHandles.privateLookupIn`), same pattern as
`AbstractNodeQueue` itself;
+ * avoids per-instance `AtomicInteger`.
+ * - `drainBatchSize` is read once into a stack-local at the top of
`drain` so the JIT can treat the loop
+ * bound as a constant.
+ * - Per-tell allocation = 1 Node (`AbstractNodeQueue.Node`, ~24 bytes) +
1 Tuple2 (~24 bytes). The
+ * Tuple2 is forced by the public `StageActorRef.Receive` type. No
AsyncInput / Envelope per tell —
+ * those are amortized across the batch.
+ */
+ // Not marked `private` so that `class StageActor`'s aux constructor
(compiled outside of the companion
+ // object on Scala 3) can reference it; the enclosing `object StageActor`
is itself private.
+ final class LazyDispatch(
+ interpreter: GraphInterpreter,
+ logic: GraphStageLogic,
+ handler: Any => Unit,
+ drainBatchSize: Int)
+ extends AbstractNodeQueue[(ActorRef, Any)]
+ with (Any => Unit) {
+
+ // IDLE/SCHEDULED election state. VarHandle avoids per-instance
AtomicInteger;
+ // the handle lives in the companion object (true JVM static), same
pattern as
+ // AbstractNodeQueue._tailDoNotCallMeDirectly.
+ @volatile var state: Int = SchedStateIdle
+
+ override def apply(msg: Any): Unit = {
+ // Drain path: onAsyncInput passes null as the message when scheduling
a drain envelope.
+ if (msg.asInstanceOf[AnyRef] eq null) { drain(); return }
+ // Producer path: msg is an (ActorRef, Any) tuple from the FunctionRef.
+ val pair = msg.asInstanceOf[(ActorRef, Any)]
+ // Producer-side completion guard. `runAsyncInput` short-circuits the
drain handler when the stage
+ // is completed, so without this check a CAS-winning producer would
leave state=SCHEDULED forever
+ // and subsequent producers would skip the mailbox push, growing the
queue unbounded. Dropping
+ // post-completion sends here matches the original per-tell behaviour
where `runAsyncInput`
+ // silently ignored them. The read is racy against the interpreter
thread that flips
+ // `shutdownCounter`, but eventually visible — enough to bound the
leak to messages enqueued
+ // before completion becomes visible to this thread.
+ if (interpreter.isStageCompleted(logic)) return
+ add(pair) // Vyukov producer path: getAndSet + release-store, no CAS
spin
+ // Double-checked CAS: uncontended fast path is one volatile read;
only the IDLE->SCHEDULED winner
+ // pays a CAS + mailbox push.
+ val u = LazyDispatch.stateHandle
+ // Scala 3's strict inference cannot pick the Int-returning
signature-polymorphic overload of
+ // `VarHandle.get` without explicit return-type context — a typed
local witnesses it. On Scala 2
+ // this is a no-op. Keep the double-checked plain read fast path:
under producer contention
+ // only the IDLE→SCHEDULED winner pays a CAS, the rest just read.
+ val cur: Int = u.get(this)
+ if (cur == SchedStateIdle && u.compareAndSet(this, SchedStateIdle,
SchedStateScheduled)) {
+ // Re-check after winning election: if completion landed between the
initial guard and the CAS,
+ // `scheduleDrain` would post an envelope that `runAsyncInput`
skips, leaving state=SCHEDULED.
+ // Reset to IDLE and skip the schedule.
+ if (interpreter.isStageCompleted(logic)) u.set(this, SchedStateIdle)
+ else scheduleDrain()
+ }
+ }
+
+ private def scheduleDrain(): Unit =
+ // 1 AsyncInput + 1 Envelope per drain batch (amortized across up to
drainBatchSize tells).
+ // `this` serves as the drain callback (Any => Unit); onAsyncInput
calls apply(null).
+ interpreter.onAsyncInput(logic, null, NoPromise, this)
+
+ private def drain(): Unit = {
+ val u = LazyDispatch.stateHandle
+ val limit = drainBatchSize // hoisted to a local so JIT treats it as a
loop-invariant constant
+ var processed = 0
+ while (processed < limit) {
+ if (interpreter.isStageCompleted(logic)) {
+ // Stage completed mid-drain; drop the remainder (matches the
original per-tell behaviour where
+ // runAsyncInput silently skipped completed stages). Don't
reschedule — no future drain will run.
+ while (poll() ne null) ()
+ u.set(this, SchedStateIdle)
+ return
+ }
+ val item = poll()
+ if (item eq null) {
+ u.set(this, SchedStateIdle)
+ // Recheck race: a producer may have added between `poll == null`
and the IDLE publish above.
+ // That producer saw state=SCHEDULED and skipped the mailbox send,
so we must re-elect.
+ if (!isEmpty && u.compareAndSet(this, SchedStateIdle,
SchedStateScheduled))
+ scheduleDrain()
+ return
+ }
+ handler(item)
+ processed += 1
+ }
+ // Hit batch cap with items potentially still queued. The last
`handler(item)` call may have
+ // completed the stage (e.g. user code called `completeStage()`); a
fresh `scheduleDrain` would
+ // post an envelope that `runAsyncInput` skips, leaving
state=SCHEDULED forever. Mirror the
+ // mid-loop branch: drain remainder, publish IDLE, do not reschedule.
+ if (interpreter.isStageCompleted(logic)) {
+ while (poll() ne null) ()
+ u.set(this, SchedStateIdle)
+ return
+ }
+ // Re-schedule another envelope so other BoundaryEvents
(pull/push/complete) can interleave via
+ // the actor mailbox. `state` stays SCHEDULED: concurrent producers
observe SCHEDULED and skip;
+ // the new envelope will drain.
+ scheduleDrain()
+ }
+ }
+
+ object LazyDispatch {
+ private val stateHandle: VarHandle = {
+ val lookup = MethodHandles.privateLookupIn(classOf[LazyDispatch],
MethodHandles.lookup())
Review Comment:
this is our own code - why do we need to use var handles? Can't we just
change LazyDispatch to expose methods that work with the 'state' field?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]