He-Pin commented on code in PR #3035:
URL: https://github.com/apache/pekko/pull/3035#discussion_r3447514593


##########
stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala:
##########
@@ -275,6 +309,135 @@ object GraphStageLogic {
     type Receive = ((ActorRef, Any)) => Unit
   }
 
+  private object StageActor {
+    def localCell(ref: ActorRef, description: String): ActorCell =
+      ref match {
+        case ref: LocalActorRef       => ref.underlying
+        case ref: RepointableActorRef =>
+          ref.underlying match {
+            case cell: ActorCell => cell
+            case unknown         =>
+              throw new IllegalStateException(s"$description must be a local 
actor, was [${unknown.getClass.getName}]")
+          }
+        case unknown =>
+          throw new IllegalStateException(s"$description must be a local 
actor, was [${unknown.getClass.getName}]")
+      }
+
+    /**
+     * Reads `pekko.stream.materializer.stage-actor-drain-batch` from the 
materializer's ActorSystem config.
+     * Called once per lazy StageActor construction (never on the hot path). 
Bounded to `>= 1`.
+     */
+    def drainBatchSize(materializer: Materializer): Int =
+      Math.max(1, 
materializer.system.settings.config.getInt("pekko.stream.materializer.stage-actor-drain-batch"))
+
+    private final val SchedStateIdle: Int = 0
+    private final val SchedStateScheduled: Int = 1
+
+    /**
+     * Lazy-path dispatch: producers enqueue into a Vyukov MPSC queue and 
elect a single drain via
+     * IDLE -> SCHEDULED CAS; only the elected producer pays a mailbox 
enqueue. The drain runs on the
+     * interpreter thread, polls in a tight loop bounded by `drainBatchSize`, 
then either publishes IDLE
+     * (with a recheck for the publish-window race) or re-schedules another 
envelope to yield to other
+     * BoundaryEvents.
+     *
+     * JIT/GC notes:
+     *  - `final class` + monomorphic per-StageActor instance → JIT 
devirtualizes the apply at the
+     *    FunctionRef call site.
+     *  - Extends `AbstractNodeQueue` directly so the queue head atomic and 
the dispatch function share one
+     *    object (one allocation per StageActor, one fewer field deref on the 
producer hot path).
+     *  - Implements `Any => Unit` directly — serves as both the producer 
callback and the drain callback,
+     *    eliminating the separate `drainCallback` lambda allocation.
+     *  - `state` is a plain `@volatile var Int` driven by a static 
`VarHandle` in the companion object
+     *    (via `MethodHandles.privateLookupIn`), same pattern as 
`AbstractNodeQueue` itself;
+     *    avoids per-instance `AtomicInteger`.
+     *  - `drainBatchSize` is read once into a stack-local at the top of 
`drain` so the JIT can treat the loop
+     *    bound as a constant.
+     *  - Per-tell allocation = 1 Node (`AbstractNodeQueue.Node`, ~24 bytes) + 
1 Tuple2 (~24 bytes). The
+     *    Tuple2 is forced by the public `StageActorRef.Receive` type. No 
AsyncInput / Envelope per tell —
+     *    those are amortized across the batch.
+     */
+    // Not marked `private` so that `class StageActor`'s aux constructor 
(compiled outside of the companion
+    // object on Scala 3) can reference it; the enclosing `object StageActor` 
is itself private.
+    final class LazyDispatch(
+        interpreter: GraphInterpreter,
+        logic: GraphStageLogic,
+        handler: Any => Unit,
+        drainBatchSize: Int)
+        extends AbstractNodeQueue[(ActorRef, Any)]
+        with (Any => Unit) {
+
+      // IDLE/SCHEDULED election state. VarHandle avoids per-instance 
AtomicInteger;
+      // the handle lives in the companion object (true JVM static), same 
pattern as
+      // AbstractNodeQueue._tailDoNotCallMeDirectly.
+      @volatile var state: Int = SchedStateIdle

Review Comment:
   Rechecked this and updated #3116. Instead of switching to `AtomicInteger`, 
the PR now keeps the VarHandle-backed `@volatile var state`, uses `getVolatile` 
/ `setVolatile` / `compareAndSet`, and retains a direct private reference to 
the field (`_preventPrivateUnusedErasure`) like the existing Pekko VarHandle 
fields. This avoids per-instance `AtomicInteger` allocation while addressing 
the private-field retention concern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to