Copilot commented on code in PR #3035:
URL: https://github.com/apache/pekko/pull/3035#discussion_r3333598145
##########
stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala:
##########
@@ -1382,7 +1529,7 @@ abstract class GraphStageLogic private[stream] (val
inCount: Int, val outCount:
* Override and return a name to be given to the StageActor of this operator.
*
* This method will be only invoked and used once, during the first
[[getStageActor]]
- * invocation whichc reates the actor, since subsequent `getStageActors`
calls function
+ * invocation which creates the actor, since subsequent `getStageActors`
calls function
* like `become`, rather than creating new actors.
Review Comment:
Scaladoc refers to `getStageActors` (plural), but the public API method is
`getStageActor`. This makes the documentation misleading and the inline code
reference unsearchable.
##########
stream/src/main/java/org/apache/pekko/stream/stage/AbstractMpscDispatch.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+package org.apache.pekko.stream.stage;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import org.apache.pekko.dispatch.AbstractNodeQueue;
+
+/**
+ * INTERNAL API.
+ *
+ * <p>Vyukov MPSC linked queue augmented with a volatile {@code int} election
slot used by
+ * single-consumer drain coalescing: producers race to elect a drainer via
IDLE -> SCHEDULED CAS
+ * on the state field. The state is driven through a {@link VarHandle}
declared here so Scala
+ * callers can use the typed {@link #stateAcquire()} / {@link
#stateSetRelease(int)} / {@link
+ * #stateCompareAndSet(int, int)} accessors without hitting Scala's
polymorphic-signature inference
+ * quirks for {@code VarHandle.getAcquire} (which differ between Scala 2 and
Scala 3). Same layering
+ * as {@code AbstractMailbox} for {@code Mailbox}.
+ */
+public abstract class AbstractMpscDispatch<T> extends AbstractNodeQueue<T> {
+
+ @SuppressWarnings("unused")
+ private volatile int _stateDoNotCallMeDirectly = 0;
+
+ private static final VarHandle stateHandle;
+
+ static {
+ try {
+ stateHandle =
+ MethodHandles.privateLookupIn(AbstractMpscDispatch.class,
MethodHandles.lookup())
+ .findVarHandle(AbstractMpscDispatch.class,
"_stateDoNotCallMeDirectly", int.class);
+ } catch (Throwable t) {
+ throw new ExceptionInInitializerError(t);
+ }
+ }
+
+ /** Acquire-load of the election state. Use only from the consumer (drain)
side. */
+ public final int stateAcquire() {
+ return (int) stateHandle.getAcquire(this);
+ }
Review Comment:
`stateAcquire()` is documented as consumer-only, but it is used from
producer threads in `StageActor.LazyDispatch.apply` to implement the
double-checked election. The Javadoc should reflect that it is safe/expected to
call from any thread (or provide a separate consumer-only accessor).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]