He-Pin commented on code in PR #3035: URL: https://github.com/apache/pekko/pull/3035#discussion_r3343523873
########## stream/src/main/java/org/apache/pekko/stream/stage/AbstractMpscDispatch.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +package org.apache.pekko.stream.stage; + +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import org.apache.pekko.dispatch.AbstractNodeQueue; + +/** + * INTERNAL API. + * + * <p>Vyukov MPSC linked queue augmented with a volatile {@code int} election slot used by + * single-consumer drain coalescing: producers race to elect a drainer via IDLE -> SCHEDULED CAS + * on the state field. The state is driven through a {@link VarHandle} declared here so Scala + * callers can use the typed {@link #stateAcquire()} / {@link #stateSetRelease(int)} / {@link + * #stateCompareAndSet(int, int)} accessors without hitting Scala's polymorphic-signature inference + * quirks for {@code VarHandle.getAcquire} (which differ between Scala 2 and Scala 3). Same layering + * as {@code AbstractMailbox} for {@code Mailbox}. + */ +public abstract class AbstractMpscDispatch<T> extends AbstractNodeQueue<T> { + + @SuppressWarnings("unused") + private volatile int _stateDoNotCallMeDirectly = 0; + + private static final VarHandle stateHandle; + + static { + try { + stateHandle = + MethodHandles.privateLookupIn(AbstractMpscDispatch.class, MethodHandles.lookup()) + .findVarHandle(AbstractMpscDispatch.class, "_stateDoNotCallMeDirectly", int.class); + } catch (Throwable t) { + throw new ExceptionInInitializerError(t); + } + } + + /** Acquire-load of the election state. Use only from the consumer (drain) side. */ + public final int stateAcquire() { + return (int) stateHandle.getAcquire(this); + } Review Comment: Stale — the `AbstractMpscDispatch.java` helper was removed in commit 1b533f8 ("fix: keep lazy stage actor refs under supervisor") when the dispatch was refactored to a pure Scala `LazyDispatch` that extends `AbstractNodeQueue` directly and uses `AtomicInteger` for the election state, dropping the Java helper entirely. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
