He-Pin commented on code in PR #3035:
URL: https://github.com/apache/pekko/pull/3035#discussion_r3343523873


##########
stream/src/main/java/org/apache/pekko/stream/stage/AbstractMpscDispatch.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+package org.apache.pekko.stream.stage;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import org.apache.pekko.dispatch.AbstractNodeQueue;
+
+/**
+ * INTERNAL API.
+ *
+ * <p>Vyukov MPSC linked queue augmented with a volatile {@code int} election 
slot used by
+ * single-consumer drain coalescing: producers race to elect a drainer via 
IDLE -&gt; SCHEDULED CAS
+ * on the state field. The state is driven through a {@link VarHandle} 
declared here so Scala
+ * callers can use the typed {@link #stateAcquire()} / {@link 
#stateSetRelease(int)} / {@link
+ * #stateCompareAndSet(int, int)} accessors without hitting Scala's 
polymorphic-signature inference
+ * quirks for {@code VarHandle.getAcquire} (which differ between Scala 2 and 
Scala 3). Same layering
+ * as {@code AbstractMailbox} for {@code Mailbox}.
+ */
+public abstract class AbstractMpscDispatch<T> extends AbstractNodeQueue<T> {
+
+  @SuppressWarnings("unused")
+  private volatile int _stateDoNotCallMeDirectly = 0;
+
+  private static final VarHandle stateHandle;
+
+  static {
+    try {
+      stateHandle =
+          MethodHandles.privateLookupIn(AbstractMpscDispatch.class, 
MethodHandles.lookup())
+              .findVarHandle(AbstractMpscDispatch.class, 
"_stateDoNotCallMeDirectly", int.class);
+    } catch (Throwable t) {
+      throw new ExceptionInInitializerError(t);
+    }
+  }
+
+  /** Acquire-load of the election state. Use only from the consumer (drain) 
side. */
+  public final int stateAcquire() {
+    return (int) stateHandle.getAcquire(this);
+  }

Review Comment:
   Stale — the `AbstractMpscDispatch.java` helper was removed in commit 1b533f8 
("fix: keep lazy stage actor refs under supervisor") when the dispatch was 
refactored to a pure Scala `LazyDispatch` that extends `AbstractNodeQueue` 
directly and uses `AtomicInteger` for the election state, dropping the Java 
helper entirely.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to