Copilot commented on code in PR #2483: URL: https://github.com/apache/groovy/pull/2483#discussion_r3158549838
########## src/test/groovy/groovy/concurrent/BroadcastChannelAsPublisherTest.groovy: ########## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.concurrent + +import org.junit.jupiter.api.Test + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Flow +import java.util.concurrent.TimeUnit + +import static groovy.test.GroovyAssert.assertScript + +final class BroadcastChannelAsPublisherTest { + + @Test + void deliversValuesToSingleSubscriber() { + BroadcastChannel<String> broadcast = BroadcastChannel.create() + Flow.Publisher<String> publisher = broadcast.asPublisher() + + List<String> received = [] + CountDownLatch done = new CountDownLatch(1) + + publisher.subscribe(new Flow.Subscriber<String>() { + @Override void onSubscribe(Flow.Subscription s) { s.request(Long.MAX_VALUE) } + @Override void onNext(String item) { received << item } + @Override void onError(Throwable t) { done.countDown() } + @Override void onComplete() { done.countDown() } + }) + + Thread.sleep(50) // give subscriber time to wire up + broadcast.send('a') Review Comment: The `Thread.sleep(50)` calls intended to "give subscriber time to wire up" make these tests timing-dependent and slower than necessary. Since `subscribe(...)` invokes `onSubscribe` synchronously, you can usually remove the sleep or replace it with a deterministic latch/condition (e.g., wait until you have the `Subscription`, or until `broadcast.subscriberCount` reflects the subscription) before producing values. ########## src/main/java/groovy/concurrent/BroadcastChannel.java: ########## @@ -140,4 +144,149 @@ public boolean isClosed() { public int getSubscriberCount() { return subscribers.size(); } + + /** + * Returns a {@link Flow.Publisher} view of this broadcast channel. Each + * call to {@link Flow.Publisher#subscribe(Flow.Subscriber)} on the returned + * publisher creates a new {@link AsyncChannel} subscriber under the hood, + * draining values to the downstream subscriber according to its requested + * demand. + * <p> + * Semantics: + * <ul> + * <li>Cold per-subscribe binding: each subscription starts seeing values + * from the moment it subscribes (consistent with + * {@link #subscribe()}).</li> + * <li>Backpressure: respects {@code request(n)}; the worker blocks the + * broadcast send when no demand exists (sender-side backpressure).</li> + * <li>Cancellation: closes this subscriber's channel and removes it + * from the broadcast's subscriber set.</li> + * <li>Completion: signals {@code onComplete} when the broadcast channel + * is closed and the per-subscriber buffer drained.</li> + * </ul> + * <p> + * <b>Backpressure policy (important).</b> This bridge uses lossless, + * sender-gated backpressure: {@link #send(Object)} awaits delivery to + * every live subscriber, and each per-subscriber channel has a bounded + * buffer (default 16). A subscriber that never calls {@code request(n)}, + * or that requests slowly, will fill its buffer; once full, the + * subscriber's channel suspends its backing {@code send}, which in turn + * stalls {@code BroadcastChannel.send(...)} for <em>all</em> + * subscribers. In other words, the slowest subscriber controls producer + * throughput. + * <p> + * This is intentional and matches the point-to-point semantics of + * {@link #subscribe()}: values are neither dropped nor reordered. If + * you need decoupled per-subscriber policies (drop-newest, drop-oldest, + * latest-only, or unbounded buffering), wrap the publisher with a + * Reactive Streams operator of your choice, or use a subscriber that + * drains promptly with {@code request(Long.MAX_VALUE)}. + * + * @return a {@code Flow.Publisher} backed by per-subscriber channels + * @since 6.0.0 + */ + public Flow.Publisher<T> asPublisher() { + return new BroadcastFlowPublisher(); + } + + private final class BroadcastFlowPublisher implements Flow.Publisher<T> { + @Override + public void subscribe(Flow.Subscriber<? super T> downstream) { + Objects.requireNonNull(downstream, "subscriber must not be null"); + AsyncChannel<T> channel; + try { + channel = BroadcastChannel.this.subscribe(); + } catch (ChannelClosedException e) { + downstream.onSubscribe(NOOP_SUBSCRIPTION); + downstream.onError(e); + return; + } + new BroadcastFlowSubscription<>(BroadcastChannel.this, channel, downstream).start(); + } + } + + private static final Flow.Subscription NOOP_SUBSCRIPTION = new Flow.Subscription() { + @Override public void request(long n) { } + @Override public void cancel() { } + }; + + /** + * Per-subscriber bridge that converts a backing {@link AsyncChannel} into + * a Reactive Streams subscription with demand tracking. + */ + private static final class BroadcastFlowSubscription<T> implements Flow.Subscription { + private final BroadcastChannel<T> owner; + private final AsyncChannel<T> channel; + private final Flow.Subscriber<? super T> subscriber; + private final AtomicLong demand = new AtomicLong(); + private final AtomicBoolean cancelled = new AtomicBoolean(); + private final Object lock = new Object(); + + BroadcastFlowSubscription(BroadcastChannel<T> owner, AsyncChannel<T> channel, Flow.Subscriber<? super T> subscriber) { + this.owner = owner; + this.channel = channel; + this.subscriber = subscriber; + } + + void start() { + subscriber.onSubscribe(this); + AsyncSupport.getExecutor().execute(this::drain); + } + + @Override + public void request(long n) { + if (n <= 0) { + cancel(); + subscriber.onError(new IllegalArgumentException( + "Reactive Streams §3.9: request must be positive, got " + n)); + return; Review Comment: `request(long n)` calls `subscriber.onError(...)` directly on the caller thread. If a downstream calls `request(0)`/`request(-1)` from a different thread while `drain()` is concurrently calling `onNext`, this can violate the Reactive Streams rule that signals (`onNext`/`onError`/`onComplete`) must be serialized (non-concurrent). Consider routing this terminal error through the same single-threaded drain path (e.g., record a terminal error + wake drain) so all downstream signals come from one place. ########## src/spec/doc/core-concurrent-actors.adoc: ########## @@ -93,6 +93,40 @@ inventory.send { state -> assert await(inventory.getAsync()) == [apples: 20, bananas: 10] ---- +=== Observing state changes + +An agent exposes a `Flow.Publisher<T>` of state transitions via +`changes()`. Each successful update emits the new value to every +subscriber that has subscribed since its last reset. The stream is +hot (no replay of prior state), per-subscriber buffered, and closes +with `onComplete` when `shutdown()` is called: Review Comment: The wording "subscribed since its last reset" is misleading here: `Agent` doesn't expose a reset concept, and `changes()` is purely hot/no-replay (subscribers see updates after they subscribe). Please rephrase to avoid implying a resettable/replay boundary that doesn't exist. ```suggestion subscriber that is already subscribed at the time of the update. The stream is hot (no replay of prior state), per-subscriber buffered, and closes with `onComplete` when `shutdown()` is called: ``` ########## src/main/java/groovy/concurrent/Agent.java: ########## @@ -179,24 +189,100 @@ public Awaitable<T> sendAndGet(Function<T, T> updateFn) { /** * Shuts down the agent's update executor. No further updates will * be accepted. Pending updates are executed before shutdown completes. + * The changes publisher (if any subscribers attached) is closed after + * pending updates drain, signalling {@code onComplete} to all live + * subscribers. Calling {@code shutdown()} more than once is a no-op. */ public void shutdown() { + SubmissionPublisher<T> p; + synchronized (lifecycleLock) { + if (shutdownInvoked) return; + shutdownInvoked = true; + p = changesPublisher; + } + if (p != null) { + // Submit close as a terminator task so it runs after any queued + // updates have drained — ensures applyUpdate never offers to a + // publisher that has already been closed. + try { + updateExecutor.execute(p::close); + } catch (RejectedExecutionException ex) { + p.close(); + } + } updateExecutor.shutdown(); } + /** + * Returns a {@link Flow.Publisher} that emits the agent's value after + * every successful update. The publisher is hot and per-subscriber: + * <ul> + * <li>Subscribers see only changes that occur after they subscribe; + * the current value at subscription time is <em>not</em> replayed.</li> + * <li>Each subscriber gets an independent buffer (default + * {@value #DEFAULT_CHANGES_BUFFER} items).</li> + * <li>Slow subscribers drop the most recent value rather than + * blocking the agent's update thread. Values already buffered + * are delivered in order; only newly-offered values that cannot + * fit are discarded.</li> + * <li>Closes (signals {@code onComplete}) when {@link #shutdown()} is + * called. If {@code changes()} is first called after + * {@code shutdown()}, the returned publisher is already closed + * and subscribers receive {@code onComplete} immediately.</li> + * </ul> + * <p> + * Typical use: + * <pre>{@code + * for await (newValue in agent.changes()) { + * log.info "Agent value is now {}", newValue + * } + * }</pre> + * + * @return a hot publisher of state transitions + * @since 6.0.0 + */ + public Flow.Publisher<T> changes() { + SubmissionPublisher<T> p = changesPublisher; + if (p == null) { + synchronized (lifecycleLock) { + p = changesPublisher; + if (p == null) { + p = new SubmissionPublisher<>( + AsyncSupport.getExecutor(), DEFAULT_CHANGES_BUFFER); + if (shutdownInvoked) p.close(); + changesPublisher = p; + } + } + } + return p; + } + @Override public String toString() { return "Agent[" + get() + "]"; } private T applyUpdate(Function<T, T> updateFn) { rwLock.writeLock().lock(); + T newValue; try { value = updateFn.apply(value); - return value; + newValue = value; } finally { rwLock.writeLock().unlock(); } + SubmissionPublisher<T> p = changesPublisher; + if (p != null) { + try { + // Non-blocking offer: drop on slow subscribers rather than + // back-pressuring the agent's serialised update loop. + p.offer(newValue, (sub, item) -> false); + } catch (IllegalStateException ignore) { + // Publisher was closed concurrently (changes() created + // after shutdown() marked the flag). Drop this emission. + } Review Comment: The inline comment says this `offer` is used to "drop on slow subscribers", but the `onDrop` predicate always returns `false`, and then `IllegalStateException` is swallowed. This makes the drop behavior implicit/exception-driven and risks dropping emissions more broadly than intended. Consider using an explicit `onDrop` policy that matches the documented semantics (e.g., return the value that indicates dropping for the lagging subscriber) and avoid relying on catching `IllegalStateException` for normal flow control. ########## src/test/groovy/groovy/concurrent/AgentChangesTest.groovy: ########## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.concurrent + +import org.junit.jupiter.api.Test + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Flow +import java.util.concurrent.TimeUnit + +import static groovy.test.GroovyAssert.assertScript + +final class AgentChangesTest { + + @Test + void emitsValueAfterEachUpdate() { + Agent<Integer> agent = Agent.create(0) + try { + List<Integer> received = Collections.synchronizedList([]) + CountDownLatch sawThree = new CountDownLatch(3) + + agent.changes().subscribe(new Flow.Subscriber<Integer>() { + @Override void onSubscribe(Flow.Subscription s) { s.request(Long.MAX_VALUE) } + @Override void onNext(Integer item) { received << item; sawThree.countDown() } + @Override void onError(Throwable t) {} + @Override void onComplete() {} + }) + Thread.sleep(50) // let subscription register + Review Comment: Several tests use `Thread.sleep(50)` to "let subscription register" after calling `agent.changes().subscribe(...)`. Since the subscription handshake (`onSubscribe`) happens during `subscribe`, these sleeps are typically unnecessary and can introduce avoidable timing sensitivity; prefer a latch/condition tied to the subscription (or remove the sleep if not needed). ########## src/main/java/groovy/concurrent/BroadcastChannel.java: ########## @@ -140,4 +144,149 @@ public boolean isClosed() { public int getSubscriberCount() { return subscribers.size(); } + + /** + * Returns a {@link Flow.Publisher} view of this broadcast channel. Each + * call to {@link Flow.Publisher#subscribe(Flow.Subscriber)} on the returned + * publisher creates a new {@link AsyncChannel} subscriber under the hood, + * draining values to the downstream subscriber according to its requested + * demand. + * <p> + * Semantics: + * <ul> + * <li>Cold per-subscribe binding: each subscription starts seeing values + * from the moment it subscribes (consistent with + * {@link #subscribe()}).</li> + * <li>Backpressure: respects {@code request(n)}; the worker blocks the + * broadcast send when no demand exists (sender-side backpressure).</li> + * <li>Cancellation: closes this subscriber's channel and removes it + * from the broadcast's subscriber set.</li> + * <li>Completion: signals {@code onComplete} when the broadcast channel + * is closed and the per-subscriber buffer drained.</li> + * </ul> + * <p> + * <b>Backpressure policy (important).</b> This bridge uses lossless, + * sender-gated backpressure: {@link #send(Object)} awaits delivery to + * every live subscriber, and each per-subscriber channel has a bounded + * buffer (default 16). A subscriber that never calls {@code request(n)}, + * or that requests slowly, will fill its buffer; once full, the + * subscriber's channel suspends its backing {@code send}, which in turn + * stalls {@code BroadcastChannel.send(...)} for <em>all</em> + * subscribers. In other words, the slowest subscriber controls producer + * throughput. + * <p> + * This is intentional and matches the point-to-point semantics of + * {@link #subscribe()}: values are neither dropped nor reordered. If + * you need decoupled per-subscriber policies (drop-newest, drop-oldest, + * latest-only, or unbounded buffering), wrap the publisher with a + * Reactive Streams operator of your choice, or use a subscriber that + * drains promptly with {@code request(Long.MAX_VALUE)}. + * + * @return a {@code Flow.Publisher} backed by per-subscriber channels + * @since 6.0.0 + */ + public Flow.Publisher<T> asPublisher() { + return new BroadcastFlowPublisher(); + } + + private final class BroadcastFlowPublisher implements Flow.Publisher<T> { + @Override + public void subscribe(Flow.Subscriber<? super T> downstream) { + Objects.requireNonNull(downstream, "subscriber must not be null"); + AsyncChannel<T> channel; + try { + channel = BroadcastChannel.this.subscribe(); + } catch (ChannelClosedException e) { + downstream.onSubscribe(NOOP_SUBSCRIPTION); + downstream.onError(e); + return; + } + new BroadcastFlowSubscription<>(BroadcastChannel.this, channel, downstream).start(); + } + } + + private static final Flow.Subscription NOOP_SUBSCRIPTION = new Flow.Subscription() { + @Override public void request(long n) { } + @Override public void cancel() { } + }; + + /** + * Per-subscriber bridge that converts a backing {@link AsyncChannel} into + * a Reactive Streams subscription with demand tracking. + */ + private static final class BroadcastFlowSubscription<T> implements Flow.Subscription { + private final BroadcastChannel<T> owner; + private final AsyncChannel<T> channel; + private final Flow.Subscriber<? super T> subscriber; + private final AtomicLong demand = new AtomicLong(); + private final AtomicBoolean cancelled = new AtomicBoolean(); + private final Object lock = new Object(); + + BroadcastFlowSubscription(BroadcastChannel<T> owner, AsyncChannel<T> channel, Flow.Subscriber<? super T> subscriber) { + this.owner = owner; + this.channel = channel; + this.subscriber = subscriber; + } + + void start() { + subscriber.onSubscribe(this); + AsyncSupport.getExecutor().execute(this::drain); + } + + @Override + public void request(long n) { + if (n <= 0) { + cancel(); + subscriber.onError(new IllegalArgumentException( + "Reactive Streams §3.9: request must be positive, got " + n)); + return; + } + long prev, next; + do { + prev = demand.get(); + next = prev + n; + if (next < 0) next = Long.MAX_VALUE; // saturate on overflow + } while (!demand.compareAndSet(prev, next)); + synchronized (lock) { lock.notifyAll(); } + } + + @Override + public void cancel() { + if (!cancelled.compareAndSet(false, true)) return; + owner.subscribers.remove(channel); + channel.close(); + synchronized (lock) { lock.notifyAll(); } + } + + private void drain() { + try { + while (!cancelled.get()) { + if (demand.get() == 0) { + synchronized (lock) { + while (demand.get() == 0 && !cancelled.get()) { + lock.wait(); Review Comment: In the drain loop, terminal completion is effectively backpressured: when `demand == 0` the worker waits on `lock`, so a subscriber that never calls `request(n)` will never observe `onComplete` after the broadcast/channel is closed (and the drain task can remain parked indefinitely). Reactive Streams terminal signals must not depend on demand. Consider checking `channel.isClosed()`/`channel.getBufferedSize()` while waiting (or otherwise unblocking on close) and emitting `onComplete` once the channel is closed+drained even with zero demand. ```suggestion if (channel.isClosed() && channel.getBufferedSize() == 0) { subscriber.onComplete(); return; } lock.wait(100L); ``` ########## subprojects/groovy-http-builder/src/test/groovy/groovy/http/MultiExchangeTickerEndToEndTest.groovy: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package groovy.http + +import com.sun.net.httpserver.HttpServer +import groovy.concurrent.Agent +import groovy.concurrent.AsyncChannel +import groovy.concurrent.BroadcastChannel +import groovy.concurrent.ChannelClosedException +import groovy.json.JsonSlurper +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +import java.nio.charset.StandardCharsets +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Flow +import java.util.concurrent.TimeUnit + +import static org.apache.groovy.runtime.async.AsyncSupport.await + +/** + * End-to-end integration of {@link HttpStreamResult#bodyAsLinePublisher()}, + * {@link BroadcastChannel#asPublisher()} and {@link Agent#changes()}. + * <p> + * Three mock exchanges stream NDJSON ticks over chunked HTTP. The pipeline + * fans them into a single {@link AsyncChannel}, updates a per-symbol stats + * {@link Agent}, and routes threshold-crossing alerts through a broadcast + * topic. Two independent consumers — one on the agent's change stream, one + * on the alert broadcast — verify the wiring end-to-end. + * + * <pre> + * binance (NDJSON) coinbase (NDJSON) kraken (NDJSON) + * │ │ │ + * ▼ ▼ ▼ + * getStreamAsync getStreamAsync getStreamAsync + * │ │ │ + * ╰── bodyAsLinePublisher() (Flow.Publisher<String>) ──╮ + * │ + * for await (line in publisher) │ ← FlowPublisherAdapter + * │ │ + * ▼ │ + * AsyncChannel<Map> (unified) ◀───────╯ + * │ + * ▼ + * aggregator: stats.send { … } + * │ + * ▼ + * Agent<Map<String, Map>> (per-symbol stats) + * │ │ + * │ if price >= threshold + * ▼ ▼ + * agent.changes() BroadcastChannel<String> + * Flow.Publisher │ + * │ ╭────────────┴─────────────╮ + * │ │ │ + * ▼ ▼ ▼ + * Flow.Subscriber asPublisher() subscribe() + * stateSnapshots Flow.Subscriber for await Review Comment: The class-level diagram/comment says the broadcast Iterable consumer uses `for await`, but the test code actually iterates the `AsyncChannel` with a regular `for (a in alertChannel)`. Either update the diagram or use `for await` in the code to keep the end-to-end narrative consistent. ```suggestion * stateSnapshots Flow.Subscriber for (a in alertChannel) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
