Copilot commented on code in PR #3035:
URL: https://github.com/apache/pekko/pull/3035#discussion_r3345632116


##########
stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala:
##########
@@ -275,6 +308,123 @@ object GraphStageLogic {
     type Receive = ((ActorRef, Any)) => Unit
   }
 
+  private object StageActor {
+    def localCell(ref: ActorRef, description: String): ActorCell =
+      ref match {
+        case ref: LocalActorRef       => ref.underlying
+        case ref: RepointableActorRef =>
+          ref.underlying match {
+            case cell: ActorCell => cell
+            case unknown         =>
+              throw new IllegalStateException(s"$description must be a local 
actor, was [${unknown.getClass.getName}]")
+          }
+        case unknown =>
+          throw new IllegalStateException(s"$description must be a local 
actor, was [${unknown.getClass.getName}]")
+      }
+
+    /**
+     * Reads `pekko.stream.materializer.stage-actor-drain-batch` from the 
materializer's ActorSystem config.
+     * Called once per lazy StageActor construction (never on the hot path). 
Bounded to `>= 1`.
+     */
+    def drainBatchSize(materializer: Materializer): Int =
+      Math.max(1, 
materializer.system.settings.config.getInt("pekko.stream.materializer.stage-actor-drain-batch"))
+
+    private final val SchedStateIdle: Int = 0
+    private final val SchedStateScheduled: Int = 1
+
+    /**
+     * Lazy-path dispatch: producers enqueue into a Vyukov MPSC queue and 
elect a single drain via
+     * IDLE -> SCHEDULED CAS; only the elected producer pays a mailbox 
enqueue. The drain runs on the
+     * interpreter thread, polls in a tight loop bounded by `drainBatchSize`, 
then either publishes IDLE
+     * (with a recheck for the publish-window race) or re-schedules another 
envelope to yield to other
+     * BoundaryEvents.
+     *
+     * JIT/GC notes:
+     *  - `final class` + monomorphic per-StageActor instance → JIT 
devirtualizes the apply at the
+     *    FunctionRef call site.
+     *  - Extends `AbstractNodeQueue` directly so the queue head atomic and 
the dispatch function share one
+     *    object (one allocation per StageActor, one fewer field deref on the 
producer hot path).
+     *  - All hot-path state is `private[this]` → direct field access, no 
accessor methods.
+     *  - `drainBatchSize` is read once into a stack-local at the top of 
`drain` so the JIT can treat the loop
+     *    bound as a constant.
+     *  - Per-tell allocation = 1 Node (`AbstractNodeQueue.Node`, ~24 bytes) + 
1 Tuple2 (~24 bytes). The
+     *    Tuple2 is forced by the public `StageActorRef.Receive` type. No 
AsyncInput / Envelope per tell —
+     *    those are amortized across the batch.
+     */
+    // Not marked `private` so that `class StageActor`'s aux constructor 
(compiled outside of the companion
+    // object on Scala 3) can reference it; the enclosing `object StageActor` 
is itself private.
+    final class LazyDispatch(
+        interpreter: GraphInterpreter,
+        logic: GraphStageLogic,
+        handler: Any => Unit,
+        drainBatchSize: Int)
+        extends AbstractNodeQueue[(ActorRef, Any)]
+        with (((ActorRef, Any)) => Unit) {
+
+      // IDLE/SCHEDULED election state. AtomicInteger gives us volatile read + 
CAS without the cross-Scala
+      // VarHandle / field-updater access fuss; the wrapper costs one extra 
reference per StageActor, which
+      // is negligible against the per-tell mailbox traffic we are saving.
+      private[this] val state = new AtomicInteger(SchedStateIdle)

Review Comment:
   The PR description claims the lazy dispatch election state is implemented as 
a `@volatile var Int` driven by a `VarHandle` (to avoid per-instance wrapper 
allocations), but the implementation here uses `new AtomicInteger(...)`. Please 
either update the PR description to match the code, or switch the 
implementation to the described `VarHandle` pattern if that is still the 
intended design goal.



##########
stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala:
##########
@@ -275,6 +308,123 @@ object GraphStageLogic {
     type Receive = ((ActorRef, Any)) => Unit
   }
 
+  private object StageActor {
+    def localCell(ref: ActorRef, description: String): ActorCell =
+      ref match {
+        case ref: LocalActorRef       => ref.underlying
+        case ref: RepointableActorRef =>
+          ref.underlying match {
+            case cell: ActorCell => cell
+            case unknown         =>
+              throw new IllegalStateException(s"$description must be a local 
actor, was [${unknown.getClass.getName}]")
+          }
+        case unknown =>
+          throw new IllegalStateException(s"$description must be a local 
actor, was [${unknown.getClass.getName}]")
+      }
+
+    /**
+     * Reads `pekko.stream.materializer.stage-actor-drain-batch` from the 
materializer's ActorSystem config.
+     * Called once per lazy StageActor construction (never on the hot path). 
Bounded to `>= 1`.
+     */
+    def drainBatchSize(materializer: Materializer): Int =
+      Math.max(1, 
materializer.system.settings.config.getInt("pekko.stream.materializer.stage-actor-drain-batch"))
+
+    private final val SchedStateIdle: Int = 0
+    private final val SchedStateScheduled: Int = 1
+
+    /**
+     * Lazy-path dispatch: producers enqueue into a Vyukov MPSC queue and 
elect a single drain via
+     * IDLE -> SCHEDULED CAS; only the elected producer pays a mailbox 
enqueue. The drain runs on the
+     * interpreter thread, polls in a tight loop bounded by `drainBatchSize`, 
then either publishes IDLE
+     * (with a recheck for the publish-window race) or re-schedules another 
envelope to yield to other
+     * BoundaryEvents.
+     *
+     * JIT/GC notes:
+     *  - `final class` + monomorphic per-StageActor instance → JIT 
devirtualizes the apply at the
+     *    FunctionRef call site.
+     *  - Extends `AbstractNodeQueue` directly so the queue head atomic and 
the dispatch function share one
+     *    object (one allocation per StageActor, one fewer field deref on the 
producer hot path).
+     *  - All hot-path state is `private[this]` → direct field access, no 
accessor methods.
+     *  - `drainBatchSize` is read once into a stack-local at the top of 
`drain` so the JIT can treat the loop
+     *    bound as a constant.
+     *  - Per-tell allocation = 1 Node (`AbstractNodeQueue.Node`, ~24 bytes) + 
1 Tuple2 (~24 bytes). The
+     *    Tuple2 is forced by the public `StageActorRef.Receive` type. No 
AsyncInput / Envelope per tell —
+     *    those are amortized across the batch.
+     */
+    // Not marked `private` so that `class StageActor`'s aux constructor 
(compiled outside of the companion
+    // object on Scala 3) can reference it; the enclosing `object StageActor` 
is itself private.
+    final class LazyDispatch(
+        interpreter: GraphInterpreter,
+        logic: GraphStageLogic,
+        handler: Any => Unit,
+        drainBatchSize: Int)
+        extends AbstractNodeQueue[(ActorRef, Any)]
+        with (((ActorRef, Any)) => Unit) {
+
+      // IDLE/SCHEDULED election state. AtomicInteger gives us volatile read + 
CAS without the cross-Scala
+      // VarHandle / field-updater access fuss; the wrapper costs one extra 
reference per StageActor, which
+      // is negligible against the per-tell mailbox traffic we are saving.
+      private[this] val state = new AtomicInteger(SchedStateIdle)
+
+      // Reused across all drain batches; allocated once at construction.
+      private[this] val drainCallback: Any => Unit = (_: Any) => drain()
+
+      override def apply(pair: (ActorRef, Any)): Unit = {
+        // Producer-side completion guard. `runAsyncInput` short-circuits the 
drain handler when the stage
+        // is completed, so without this check a CAS-winning producer would 
leave state=SCHEDULED forever
+        // and subsequent producers would skip the mailbox push, growing the 
queue unbounded. Dropping
+        // post-completion sends here matches the original per-tell behaviour 
where `runAsyncInput`
+        // silently ignored them. The read is racy against the interpreter 
thread that flips
+        // `shutdownCounter`, but eventually visible — enough to bound the 
leak to messages enqueued
+        // before completion becomes visible to this thread.
+        if (interpreter.isStageCompleted(logic)) return
+        add(pair) // Vyukov producer path: getAndSet + release-store, no CAS 
spin
+        // Double-checked CAS: uncontended fast path is one volatile read; 
only the IDLE->SCHEDULED winner
+        // pays a CAS + mailbox push.
+        if (state.get() == SchedStateIdle && 
state.compareAndSet(SchedStateIdle, SchedStateScheduled)) {
+          // Re-check after winning election: if completion landed between the 
initial guard and the CAS,
+          // `scheduleDrain` would post an envelope that `runAsyncInput` 
skips, leaving state=SCHEDULED.
+          // Reset to IDLE and skip the schedule.
+          if (interpreter.isStageCompleted(logic)) state.set(SchedStateIdle)
+          else scheduleDrain()
+        }
+      }
+
+      private def scheduleDrain(): Unit =
+        // 1 AsyncInput + 1 Envelope per drain batch (amortized across up to 
drainBatchSize tells).
+        interpreter.onAsyncInput(logic, null, NoPromise, drainCallback)
+
+      private def drain(): Unit = {
+        val limit = drainBatchSize // hoisted to a local so JIT treats it as a 
loop-invariant constant
+        var processed = 0
+        while (processed < limit) {
+          if (interpreter.isStageCompleted(logic)) {
+            // Stage completed mid-drain; drop the remainder (matches the 
original per-tell behaviour where
+            // runAsyncInput silently skipped completed stages). Don't 
reschedule — no future drain will run.
+            while (poll() ne null) ()
+            state.set(SchedStateIdle)
+            return
+          }
+          val item = poll()
+          if (item eq null) {
+            state.set(SchedStateIdle)
+            // Recheck race: a producer may have added between `poll == null` 
and the IDLE publish above.
+            // That producer saw state=SCHEDULED and skipped the mailbox send, 
so we must re-elect.
+            if (!isEmpty && state.compareAndSet(SchedStateIdle, 
SchedStateScheduled))
+              scheduleDrain()
+            return
+          }
+          handler(item)
+          processed += 1
+        }
+        // Hit batch cap with items potentially still queued. Re-schedule 
another envelope so other
+        // BoundaryEvents (pull/push/complete) can interleave via the actor 
mailbox. `scheduledState` stays
+        // SCHEDULED: concurrent producers correctly observe SCHEDULED and 
skip; the new envelope will drain.
+        scheduleDrain()

Review Comment:
   `LazyDispatch.drain()` always schedules another drain envelope after hitting 
the batch limit, even if the last processed item completed the stage. In that 
case `GraphInterpreter.runAsyncInput` will skip the scheduled drain (because 
the stage is already completed), leaving `state` stuck at SCHEDULED and 
retaining any already-enqueued items until the FunctionRef is stopped. Add a 
completion check before re-scheduling and drop any remaining queued items when 
completed (matching the existing mid-drain completion branch).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to