Copilot commented on code in PR #3035:
URL: https://github.com/apache/pekko/pull/3035#discussion_r3340382157


##########
stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala:
##########
@@ -275,6 +308,110 @@ object GraphStageLogic {
     type Receive = ((ActorRef, Any)) => Unit
   }
 
+  private object StageActor {
+    def localCell(ref: ActorRef, description: String): ActorCell =
+      ref match {
+        case ref: LocalActorRef       => ref.underlying
+        case ref: RepointableActorRef =>
+          ref.underlying match {
+            case cell: ActorCell => cell
+            case unknown         =>
+              throw new IllegalStateException(s"$description must be a local 
actor, was [${unknown.getClass.getName}]")
+          }
+        case unknown =>
+          throw new IllegalStateException(s"$description must be a local 
actor, was [${unknown.getClass.getName}]")
+      }
+
+    /**
+     * Reads `pekko.stream.materializer.stage-actor-drain-batch` from the 
materializer's ActorSystem config.
+     * Called once per lazy StageActor construction (never on the hot path). 
Bounded to `>= 1`.
+     */
+    def drainBatchSize(materializer: Materializer): Int =
+      Math.max(1, 
materializer.system.settings.config.getInt("pekko.stream.materializer.stage-actor-drain-batch"))
+
+    private final val SchedStateIdle: Int = 0
+    private final val SchedStateScheduled: Int = 1
+
+    /**
+     * Lazy-path dispatch: producers enqueue into a Vyukov MPSC queue and 
elect a single drain via
+     * IDLE -> SCHEDULED CAS; only the elected producer pays a mailbox 
enqueue. The drain runs on the
+     * interpreter thread, polls in a tight loop bounded by `drainBatchSize`, 
then either publishes IDLE
+     * (with a recheck for the publish-window race) or re-schedules another 
envelope to yield to other
+     * BoundaryEvents.
+     *
+     * JIT/GC notes:
+     *  - `final class` + monomorphic per-StageActor instance → JIT 
devirtualizes the apply at the
+     *    FunctionRef call site.
+     *  - Extends `AbstractNodeQueue` directly so the queue head atomic and 
the dispatch function share one
+     *    object (one allocation per StageActor, one fewer field deref on the 
producer hot path).
+     *  - All hot-path state is `private[this]` → direct field access, no 
accessor methods.
+     *  - `drainBatchSize` is read once into a stack-local at the top of 
`drain` so the JIT can treat the loop
+     *    bound as a constant.
+     *  - Per-tell allocation = 1 Node (`AbstractNodeQueue.Node`, ~24 bytes) + 
1 Tuple2 (~24 bytes). The
+     *    Tuple2 is forced by the public `StageActorRef.Receive` type. No 
AsyncInput / Envelope per tell —
+     *    those are amortized across the batch.
+     */
+    // Not marked `private` so that `class StageActor`'s aux constructor 
(compiled outside of the companion
+    // object on Scala 3) can reference it; the enclosing `object StageActor` 
is itself private.
+    final class LazyDispatch(
+        interpreter: GraphInterpreter,
+        logic: GraphStageLogic,
+        handler: Any => Unit,
+        drainBatchSize: Int)
+        extends AbstractNodeQueue[(ActorRef, Any)]
+        with (((ActorRef, Any)) => Unit) {
+
+      // IDLE/SCHEDULED election state. AtomicInteger gives us volatile read + 
CAS without the cross-Scala
+      // VarHandle / field-updater access fuss; the wrapper costs one extra 
reference per StageActor, which
+      // is negligible against the per-tell mailbox traffic we are saving.
+      private[this] val state = new AtomicInteger(SchedStateIdle)
+
+      // Reused across all drain batches; allocated once at construction.
+      private[this] val drainCallback: Any => Unit = (_: Any) => drain()
+
+      override def apply(pair: (ActorRef, Any)): Unit = {
+        add(pair) // Vyukov producer path: getAndSet + release-store, no CAS 
spin
+        // Double-checked CAS: uncontended fast path is one volatile read; 
only the IDLE->SCHEDULED winner
+        // pays a CAS + mailbox push.
+        if (state.get() == SchedStateIdle && 
state.compareAndSet(SchedStateIdle, SchedStateScheduled))
+          scheduleDrain()
+      }

Review Comment:
   `LazyDispatch` enqueues and schedules drains even after the stage has 
completed. However `GraphInterpreter.runAsyncInput` explicitly skips invoking 
the handler when `isStageCompleted(logic)` is true, so a drain scheduled after 
completion will never run and queued items will never be polled (risking 
unbounded queue growth until the stage actor is stopped/GC’d). Guard in the 
producer path and drop messages once the stage is completed (matching the old 
semantics where completed stages silently ignored async inputs).



##########
stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala:
##########
@@ -206,29 +207,61 @@ object GraphStageLogic {
    *
    * Not for user instantiation, use [[GraphStageLogic.getStageActor]].
    */
-  final class StageActor @InternalApi() private[pekko] (
+  final class StageActor @InternalApi() private (
       materializer: Materializer,
-      getAsyncCallback: StageActorRef.Receive => AsyncCallback[(ActorRef, 
Any)],
       initialReceive: StageActorRef.Receive,
-      name: String) {
+      name: String,
+      cell: ActorCell,
+      buildDispatch: StageActorRef.Receive => ((ActorRef, Any)) => Unit) {
+
+    @InternalApi private[pekko] def this(
+        materializer: Materializer,
+        getAsyncCallback: StageActorRef.Receive => AsyncCallback[(ActorRef, 
Any)],
+        initialReceive: StageActorRef.Receive,
+        name: String) =
+      this(
+        materializer,
+        initialReceive,
+        name,
+        StageActor.localCell(materializer.supervisor, "Stream supervisor"),
+        receive => getAsyncCallback(receive).invoke)
+
+    @InternalApi private[pekko] def this(
+        materializer: Materializer,
+        interpreter: GraphInterpreter,
+        logic: GraphStageLogic,
+        initialReceive: StageActorRef.Receive,
+        name: String) =
+      this(
+        materializer,
+        initialReceive,
+        name,
+        StageActor.localCell(materializer.supervisor, "Stream supervisor"),
+        // Coalesce per-tell mailbox traffic: N tells produce 1 AsyncInput 
envelope (amortized).
+        receive =>
+          new StageActor.LazyDispatch(
+            interpreter,
+            logic,
+            receive.asInstanceOf[Any => Unit],
+            StageActor.drainBatchSize(materializer)))
+
+    // Monomorphic Function1 captured once; JIT can inline the apply at the 
FunctionRef call site.
+    private val dispatch: ((ActorRef, Any)) => Unit = 
buildDispatch(internalReceive)
 
-    private val callback = getAsyncCallback(internalReceive)
-
-    private def cell = materializer.supervisor match {
-      case ref: LocalActorRef => ref.underlying
-      case unknown            =>
-        throw new IllegalStateException(s"Stream supervisor must be a local 
actor, was [${unknown.getClass.getName}]")
-    }
     private val functionRef: FunctionRef = {
-      val f: (ActorRef, Any) => Unit = {
-        case (_, m @ (PoisonPill | Kill)) =>
-          materializer.logger.warning(
-            "{} message sent to StageActor({}) will be ignored, since it is 
not a real Actor." +
-            "Use a custom message type to communicate with it instead.",
-            m,
-            functionRef.path)
-        case pair => callback.invoke(pair)
-      }
+      // Explicit (sender, msg) lambda (not a pattern-match Function2 literal) 
so the PoisonPill / Kill
+      // branch matches on `msg` directly and does not allocate a Tuple2. The 
regular branch still
+      // constructs one tuple per tell, as required by the `((ActorRef, Any)) 
=> Unit` public Receive type.
+      val f: (ActorRef, Any) => Unit = (sender, msg) =>
+        msg match {
+          case PoisonPill | Kill =>
+            materializer.logger.warning(
+              "{} message sent to StageActor({}) will be ignored, since it is 
not a real Actor." +
+              "Use a custom message type to communicate with it instead.",

Review Comment:
   The warning message concatenation is missing a space between sentences, 
producing "Actor.Use ..." in logs. Add a trailing/leading space on one of the 
concatenated string literals.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to