Copilot commented on code in PR #2483:
URL: https://github.com/apache/groovy/pull/2483#discussion_r3142863682


##########
src/main/java/groovy/concurrent/BroadcastChannel.java:
##########
@@ -140,4 +144,149 @@ public boolean isClosed() {
     public int getSubscriberCount() {
         return subscribers.size();
     }
+
+    /**
+     * Returns a {@link Flow.Publisher} view of this broadcast channel. Each
+     * call to {@link Flow.Publisher#subscribe(Flow.Subscriber)} on the 
returned
+     * publisher creates a new {@link AsyncChannel} subscriber under the hood,
+     * draining values to the downstream subscriber according to its requested
+     * demand.
+     * <p>
+     * Semantics:
+     * <ul>
+     *   <li>Cold per-subscribe binding: each subscription starts seeing values
+     *       from the moment it subscribes (consistent with
+     *       {@link #subscribe()}).</li>
+     *   <li>Backpressure: respects {@code request(n)}; the worker blocks the
+     *       broadcast send when no demand exists (sender-side 
backpressure).</li>
+     *   <li>Cancellation: closes this subscriber's channel and removes it
+     *       from the broadcast's subscriber set.</li>
+     *   <li>Completion: signals {@code onComplete} when the broadcast channel
+     *       is closed and the per-subscriber buffer drained.</li>
+     * </ul>
+     * <p>
+     * <b>Backpressure policy (important).</b> This bridge uses lossless,
+     * sender-gated backpressure: {@link #send(Object)} awaits delivery to
+     * every live subscriber, and each per-subscriber channel has a bounded
+     * buffer (default 16). A subscriber that never calls {@code request(n)},
+     * or that requests slowly, will fill its buffer; once full, the
+     * subscriber's channel suspends its backing {@code send}, which in turn
+     * stalls {@code BroadcastChannel.send(...)} for <em>all</em>
+     * subscribers. In other words, the slowest subscriber controls producer
+     * throughput.
+     * <p>
+     * This is intentional and matches the point-to-point semantics of
+     * {@link #subscribe()}: values are neither dropped nor reordered. If
+     * you need decoupled per-subscriber policies (drop-newest, drop-oldest,
+     * latest-only, or unbounded buffering), wrap the publisher with a
+     * Reactive Streams operator of your choice, or use a subscriber that
+     * drains promptly with {@code request(Long.MAX_VALUE)}.
+     *
+     * @return a {@code Flow.Publisher} backed by per-subscriber channels
+     * @since 6.0.0
+     */
+    public Flow.Publisher<T> asPublisher() {
+        return new BroadcastFlowPublisher();
+    }
+
+    private final class BroadcastFlowPublisher implements Flow.Publisher<T> {
+        @Override
+        public void subscribe(Flow.Subscriber<? super T> downstream) {
+            Objects.requireNonNull(downstream, "subscriber must not be null");
+            AsyncChannel<T> channel;
+            try {
+                channel = BroadcastChannel.this.subscribe();
+            } catch (ChannelClosedException e) {
+                downstream.onSubscribe(NOOP_SUBSCRIPTION);
+                downstream.onError(e);
+                return;
+            }
+            new BroadcastFlowSubscription<>(BroadcastChannel.this, channel, 
downstream).start();
+        }
+    }
+
+    private static final Flow.Subscription NOOP_SUBSCRIPTION = new 
Flow.Subscription() {
+        @Override public void request(long n) { }
+        @Override public void cancel() { }
+    };
+
+    /**
+     * Per-subscriber bridge that converts a backing {@link AsyncChannel} into
+     * a Reactive Streams subscription with demand tracking.
+     */
+    private static final class BroadcastFlowSubscription<T> implements 
Flow.Subscription {
+        private final BroadcastChannel<T> owner;
+        private final AsyncChannel<T> channel;
+        private final Flow.Subscriber<? super T> subscriber;
+        private final AtomicLong demand = new AtomicLong();
+        private final AtomicBoolean cancelled = new AtomicBoolean();
+        private final Object lock = new Object();
+
+        BroadcastFlowSubscription(BroadcastChannel<T> owner, AsyncChannel<T> 
channel, Flow.Subscriber<? super T> subscriber) {
+            this.owner = owner;
+            this.channel = channel;
+            this.subscriber = subscriber;
+        }
+
+        void start() {
+            subscriber.onSubscribe(this);

Review Comment:
   BroadcastFlowSubscription.start() calls downstream.onSubscribe(this) without 
guarding against subscriber bugs. If onSubscribe throws, the backing 
AsyncChannel has already been added to the broadcast's subscribers list and 
will never be drained/removed, which can stall BroadcastChannel.send(...) for 
everyone. Wrap onSubscribe in try/catch and ensure cancellation/cleanup (remove 
+ close channel) on failure.
   ```suggestion
               try {
                   subscriber.onSubscribe(this);
               } catch (Throwable t) {
                   cancel();
                   throw t;
               }
   ```



##########
subprojects/groovy-http-builder/src/test/groovy/groovy/http/MultiExchangeTickerEndToEndTest.groovy:
##########
@@ -0,0 +1,261 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package groovy.http
+
+import com.sun.net.httpserver.HttpServer
+import groovy.concurrent.Agent
+import groovy.concurrent.AsyncChannel
+import groovy.concurrent.BroadcastChannel
+import groovy.concurrent.ChannelClosedException
+import groovy.json.JsonSlurper
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.Flow
+import java.util.concurrent.TimeUnit
+
+import static org.apache.groovy.runtime.async.AsyncSupport.await
+
+/**
+ * End-to-end integration of {@link HttpStreamResult#bodyAsLinePublisher()},
+ * {@link BroadcastChannel#asPublisher()} and {@link Agent#changes()}.
+ * <p>
+ * Three mock exchanges stream NDJSON ticks over chunked HTTP. The pipeline
+ * fans them into a single {@link AsyncChannel}, updates a per-symbol stats
+ * {@link Agent}, and routes threshold-crossing alerts through a broadcast
+ * topic. Two independent consumers — one on the agent's change stream, one
+ * on the alert broadcast — verify the wiring end-to-end.
+ *
+ * <pre>
+ *      binance (NDJSON)     coinbase (NDJSON)     kraken (NDJSON)
+ *            │                    │                     │
+ *            ▼                    ▼                     ▼
+ *      getStreamAsync        getStreamAsync       getStreamAsync
+ *            │                    │                     │
+ *            ╰── bodyAsLinePublisher() (Flow.Publisher&lt;String&gt;) ──╮
+ *                                                                  │
+ *                              for await (line in publisher)       │ ← 
FlowPublisherAdapter
+ *                                          │                       │
+ *                                          ▼                       │
+ *                              AsyncChannel&lt;Map&gt; (unified) ◀───────╯
+ *                                          │
+ *                                          ▼
+ *                              aggregator: stats.send { … }
+ *                                          │
+ *                                          ▼
+ *                       Agent&lt;Map&lt;String, Map&gt;&gt; (per-symbol stats)
+ *                          │                            │
+ *                          │                 if price &gt;= threshold
+ *                          ▼                            ▼
+ *                    agent.changes()        BroadcastChannel&lt;String&gt;
+ *                    Flow.Publisher                    │
+ *                          │              ╭────────────┴─────────────╮
+ *                          │              │                          │
+ *                          ▼              ▼                          ▼
+ *                  Flow.Subscriber   asPublisher()              subscribe()
+ *                  stateSnapshots    Flow.Subscriber             for await
+ *                                    alertsViaPublisher          alertsViaIter
+ * </pre>
+ */
+class MultiExchangeTickerEndToEndTest {
+
+    private final List<HttpServer> servers = []
+
+    @BeforeEach
+    void setup() {
+        // 3 exchanges: each emits a fixed sequence of ticks over chunked HTTP.
+        servers << buildExchange('binance',  [50000, 50100, 50250, 50180])
+        servers << buildExchange('coinbase', [50050, 50120, 50300, 50220])
+        servers << buildExchange('kraken',   [50010, 50090, 50260, 50190])
+    }
+
+    @AfterEach
+    void teardown() {
+        servers.each { it.stop(0) }
+    }
+
+    @Test
+    void aggregatesTicksAcrossExchangesAndPublishesAlerts() {
+        var stats = Agent.create([:].withDefault { [count: 0, last: 0d, max: 
0d] })
+        var alertTopic = BroadcastChannel.create()
+        var unified = AsyncChannel.<Map>create(64)
+
+        // Subscribers register FIRST so no early signals are dropped:
+        // SubmissionPublisher drops items with no subscribers, and
+        // BroadcastChannel.subscribe() misses pre-subscription broadcasts.
+
+        var stateSnapshots = [].asSynchronized()
+        var sawSomeUpdates = new CountDownLatch(3)
+        agentChangesSubscriber(stats, stateSnapshots, sawSomeUpdates)
+
+        var alertPublisher = alertTopic.asPublisher()
+        var alertsViaPublisher = [].asSynchronized()
+        var alertsDone = new CountDownLatch(1)
+        alertPublisher.subscribe(new Flow.Subscriber<String>() {
+            @Override void onSubscribe(Flow.Subscription s) { 
s.request(Long.MAX_VALUE) }
+            @Override void onNext(String alert) { alertsViaPublisher << alert }
+            @Override void onError(Throwable t) { alertsDone.countDown() }
+            @Override void onComplete() { alertsDone.countDown() }
+        })
+
+        // Also iterate the broadcast directly via for await (Iterable view).
+        var alertsViaIter = [].asSynchronized()
+        var alertChannel = alertTopic.subscribe()
+        Thread.startDaemon('alert-iter') {
+            for (a in alertChannel) alertsViaIter << a
+        }
+
+        // Producers and aggregator.
+
+        servers.each { server ->
+            var base = URI.create("http://127.0.0.1:${server.address.port}/";)
+            var http = HttpBuilder.http(base.toString())
+            Thread.startDaemon("drain-${base}") {
+                var res = http.getStreamAsync('/ticker').join()
+                drainPublisherIntoChannel(res.bodyAsLinePublisher(), unified)
+            }
+        }
+
+        // When all 3 exchanges have published their 4 ticks each, close the
+        // unified channel so the aggregator loop exits.
+        Thread.startDaemon('unified-closer') {
+            var totalExpected = 3 * 4
+            var seen = 0
+            while (seen < totalExpected) {
+                Thread.sleep(20)
+                seen = ((Map) await(stats.getAsync())).values().sum { (int) 
it.count } ?: 0
+            }
+            unified.close()
+            alertTopic.close()
+        }
+
+        Thread.startDaemon('aggregator') {
+            for (Map tick in unified) {
+                // The closure passed to stats.send runs on the agent's 
executor
+                // and would otherwise capture the mutable for-loop variable
+                // `tick` by reference. Bind locals so the queued update sees
+                // the right values.
+                String symbol = tick.symbol
+                double price = (double) tick.price
+                stats.send { Map<String, Map> current ->
+                    var symStats = current[symbol] ?: [count: 0, last: 0d, 
max: 0d]
+                    var updated = [
+                            count: ((int) symStats.count) + 1,
+                            last : price,
+                            max  : Math.max((double) symStats.max, price),
+                    ]
+                    var next = new HashMap<String, Map>(current)
+                    next.put(symbol, updated)
+                    next
+                }
+                if (price >= 50250) {
+                    try { alertTopic.send("HIGH 
${symbol}@${price}".toString()) }
+                    catch (ChannelClosedException ignored) {}
+                }
+            }
+        }
+
+        assert sawSomeUpdates.await(5, TimeUnit.SECONDS), 'agent.changes() did 
not deliver updates'
+        assert alertsDone.await(5, TimeUnit.SECONDS), 'alert publisher did not 
complete'
+        Thread.sleep(100)  // settle
+        stats.shutdown()

Review Comment:
   The test uses a fixed Thread.sleep(100) "settle" delay before asserting on 
the iterator-based alert consumer. This can be flaky on slower CI and can hide 
deadlocks. Prefer a deterministic signal (e.g., latch counted down when the 
iterable consumer observes channel close or when it collects the expected 
number of alerts) instead of sleeping.



##########
src/test/groovy/groovy/concurrent/AgentChangesTest.groovy:
##########
@@ -0,0 +1,208 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package groovy.concurrent
+
+import org.junit.jupiter.api.Test
+
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.Flow
+import java.util.concurrent.TimeUnit
+
+import static groovy.test.GroovyAssert.assertScript
+
+final class AgentChangesTest {
+
+    @Test
+    void emitsValueAfterEachUpdate() {
+        Agent<Integer> agent = Agent.create(0)
+        try {
+            List<Integer> received = Collections.synchronizedList([])
+            CountDownLatch sawThree = new CountDownLatch(3)
+
+            agent.changes().subscribe(new Flow.Subscriber<Integer>() {
+                @Override void onSubscribe(Flow.Subscription s) { 
s.request(Long.MAX_VALUE) }
+                @Override void onNext(Integer item) { received << item; 
sawThree.countDown() }
+                @Override void onError(Throwable t) {}
+                @Override void onComplete() {}
+            })
+            Thread.sleep(50)  // let subscription register
+
+            agent.send { it + 1 }
+            agent.send { it + 1 }
+            agent.send { it + 1 }

Review Comment:
   This test uses Thread.sleep(...) to wait for a subscription to register. 
That introduces race/flakiness. Prefer a latch that is counted down from 
onSubscribe and wait on it before sending updates.



##########
src/main/java/groovy/concurrent/BroadcastChannel.java:
##########
@@ -140,4 +144,149 @@ public boolean isClosed() {
     public int getSubscriberCount() {
         return subscribers.size();
     }
+
+    /**
+     * Returns a {@link Flow.Publisher} view of this broadcast channel. Each
+     * call to {@link Flow.Publisher#subscribe(Flow.Subscriber)} on the 
returned
+     * publisher creates a new {@link AsyncChannel} subscriber under the hood,
+     * draining values to the downstream subscriber according to its requested
+     * demand.
+     * <p>
+     * Semantics:
+     * <ul>
+     *   <li>Cold per-subscribe binding: each subscription starts seeing values
+     *       from the moment it subscribes (consistent with
+     *       {@link #subscribe()}).</li>
+     *   <li>Backpressure: respects {@code request(n)}; the worker blocks the
+     *       broadcast send when no demand exists (sender-side 
backpressure).</li>
+     *   <li>Cancellation: closes this subscriber's channel and removes it
+     *       from the broadcast's subscriber set.</li>
+     *   <li>Completion: signals {@code onComplete} when the broadcast channel
+     *       is closed and the per-subscriber buffer drained.</li>
+     * </ul>
+     * <p>
+     * <b>Backpressure policy (important).</b> This bridge uses lossless,
+     * sender-gated backpressure: {@link #send(Object)} awaits delivery to
+     * every live subscriber, and each per-subscriber channel has a bounded
+     * buffer (default 16). A subscriber that never calls {@code request(n)},
+     * or that requests slowly, will fill its buffer; once full, the
+     * subscriber's channel suspends its backing {@code send}, which in turn
+     * stalls {@code BroadcastChannel.send(...)} for <em>all</em>
+     * subscribers. In other words, the slowest subscriber controls producer
+     * throughput.
+     * <p>
+     * This is intentional and matches the point-to-point semantics of
+     * {@link #subscribe()}: values are neither dropped nor reordered. If
+     * you need decoupled per-subscriber policies (drop-newest, drop-oldest,
+     * latest-only, or unbounded buffering), wrap the publisher with a
+     * Reactive Streams operator of your choice, or use a subscriber that
+     * drains promptly with {@code request(Long.MAX_VALUE)}.
+     *
+     * @return a {@code Flow.Publisher} backed by per-subscriber channels
+     * @since 6.0.0
+     */
+    public Flow.Publisher<T> asPublisher() {
+        return new BroadcastFlowPublisher();
+    }
+
+    private final class BroadcastFlowPublisher implements Flow.Publisher<T> {
+        @Override
+        public void subscribe(Flow.Subscriber<? super T> downstream) {
+            Objects.requireNonNull(downstream, "subscriber must not be null");
+            AsyncChannel<T> channel;
+            try {
+                channel = BroadcastChannel.this.subscribe();
+            } catch (ChannelClosedException e) {
+                downstream.onSubscribe(NOOP_SUBSCRIPTION);
+                downstream.onError(e);

Review Comment:
   BroadcastFlowPublisher.subscribe() reports a closed BroadcastChannel as 
onError(ChannelClosedException). For a Flow.Publisher view, subscribing after 
close is typically treated as normal completion (onComplete) rather than an 
error, and your Javadoc for asPublisher() emphasizes completion on close. 
Consider signalling onComplete (after onSubscribe) when the broadcast is 
already closed instead of onError.
   ```suggestion
                   downstream.onComplete();
   ```



##########
src/main/java/groovy/concurrent/BroadcastChannel.java:
##########
@@ -140,4 +144,149 @@ public boolean isClosed() {
     public int getSubscriberCount() {
         return subscribers.size();
     }
+
+    /**
+     * Returns a {@link Flow.Publisher} view of this broadcast channel. Each
+     * call to {@link Flow.Publisher#subscribe(Flow.Subscriber)} on the 
returned
+     * publisher creates a new {@link AsyncChannel} subscriber under the hood,
+     * draining values to the downstream subscriber according to its requested
+     * demand.
+     * <p>
+     * Semantics:
+     * <ul>
+     *   <li>Cold per-subscribe binding: each subscription starts seeing values
+     *       from the moment it subscribes (consistent with
+     *       {@link #subscribe()}).</li>
+     *   <li>Backpressure: respects {@code request(n)}; the worker blocks the
+     *       broadcast send when no demand exists (sender-side 
backpressure).</li>
+     *   <li>Cancellation: closes this subscriber's channel and removes it
+     *       from the broadcast's subscriber set.</li>
+     *   <li>Completion: signals {@code onComplete} when the broadcast channel
+     *       is closed and the per-subscriber buffer drained.</li>
+     * </ul>
+     * <p>
+     * <b>Backpressure policy (important).</b> This bridge uses lossless,
+     * sender-gated backpressure: {@link #send(Object)} awaits delivery to
+     * every live subscriber, and each per-subscriber channel has a bounded
+     * buffer (default 16). A subscriber that never calls {@code request(n)},
+     * or that requests slowly, will fill its buffer; once full, the
+     * subscriber's channel suspends its backing {@code send}, which in turn
+     * stalls {@code BroadcastChannel.send(...)} for <em>all</em>
+     * subscribers. In other words, the slowest subscriber controls producer
+     * throughput.
+     * <p>
+     * This is intentional and matches the point-to-point semantics of
+     * {@link #subscribe()}: values are neither dropped nor reordered. If
+     * you need decoupled per-subscriber policies (drop-newest, drop-oldest,
+     * latest-only, or unbounded buffering), wrap the publisher with a
+     * Reactive Streams operator of your choice, or use a subscriber that
+     * drains promptly with {@code request(Long.MAX_VALUE)}.
+     *
+     * @return a {@code Flow.Publisher} backed by per-subscriber channels
+     * @since 6.0.0
+     */
+    public Flow.Publisher<T> asPublisher() {
+        return new BroadcastFlowPublisher();
+    }
+
+    private final class BroadcastFlowPublisher implements Flow.Publisher<T> {
+        @Override
+        public void subscribe(Flow.Subscriber<? super T> downstream) {
+            Objects.requireNonNull(downstream, "subscriber must not be null");
+            AsyncChannel<T> channel;
+            try {
+                channel = BroadcastChannel.this.subscribe();
+            } catch (ChannelClosedException e) {
+                downstream.onSubscribe(NOOP_SUBSCRIPTION);
+                downstream.onError(e);
+                return;
+            }
+            new BroadcastFlowSubscription<>(BroadcastChannel.this, channel, 
downstream).start();
+        }
+    }
+
+    private static final Flow.Subscription NOOP_SUBSCRIPTION = new 
Flow.Subscription() {
+        @Override public void request(long n) { }
+        @Override public void cancel() { }
+    };
+
+    /**
+     * Per-subscriber bridge that converts a backing {@link AsyncChannel} into
+     * a Reactive Streams subscription with demand tracking.
+     */
+    private static final class BroadcastFlowSubscription<T> implements 
Flow.Subscription {
+        private final BroadcastChannel<T> owner;
+        private final AsyncChannel<T> channel;
+        private final Flow.Subscriber<? super T> subscriber;
+        private final AtomicLong demand = new AtomicLong();
+        private final AtomicBoolean cancelled = new AtomicBoolean();
+        private final Object lock = new Object();
+
+        BroadcastFlowSubscription(BroadcastChannel<T> owner, AsyncChannel<T> 
channel, Flow.Subscriber<? super T> subscriber) {
+            this.owner = owner;
+            this.channel = channel;
+            this.subscriber = subscriber;
+        }
+
+        void start() {
+            subscriber.onSubscribe(this);
+            AsyncSupport.getExecutor().execute(this::drain);
+        }
+
+        @Override
+        public void request(long n) {
+            if (n <= 0) {
+                cancel();
+                subscriber.onError(new IllegalArgumentException(
+                        "Reactive Streams §3.9: request must be positive, got 
" + n));
+                return;
+            }
+            long prev, next;
+            do {
+                prev = demand.get();
+                next = prev + n;
+                if (next < 0) next = Long.MAX_VALUE; // saturate on overflow
+            } while (!demand.compareAndSet(prev, next));
+            synchronized (lock) { lock.notifyAll(); }
+        }
+
+        @Override
+        public void cancel() {
+            if (!cancelled.compareAndSet(false, true)) return;
+            owner.subscribers.remove(channel);
+            channel.close();
+            synchronized (lock) { lock.notifyAll(); }
+        }
+
+        private void drain() {
+            try {
+                while (!cancelled.get()) {
+                    if (demand.get() == 0) {
+                        synchronized (lock) {
+                            while (demand.get() == 0 && !cancelled.get()) {
+                                lock.wait();
+                            }
+                        }
+                        continue;
+                    }
+                    T item;
+                    try {
+                        item = AsyncSupport.await(channel.receive());
+                    } catch (ChannelClosedException e) {
+                        if (!cancelled.get()) subscriber.onComplete();
+                        return;
+                    }
+                    if (cancelled.get()) return;
+                    subscriber.onNext(item);
+                    long current = demand.get();
+                    if (current != Long.MAX_VALUE) demand.decrementAndGet();
+                }

Review Comment:
   In drain(), exceptions thrown by downstream.onNext(item) will escape to the 
outer catch and terminate the drain task, but the subscription is not cancelled 
and the per-subscriber channel is not removed/closed. That leaves an un-drained 
AsyncChannel in the broadcast subscriber set, which can block future sends 
indefinitely. Catch exceptions around onNext and cancel/cleanup before 
signalling onError.



##########
src/main/java/groovy/concurrent/Agent.java:
##########
@@ -179,24 +189,100 @@ public Awaitable<T> sendAndGet(Function<T, T> updateFn) {
     /**
      * Shuts down the agent's update executor. No further updates will
      * be accepted. Pending updates are executed before shutdown completes.
+     * The changes publisher (if any subscribers attached) is closed after
+     * pending updates drain, signalling {@code onComplete} to all live
+     * subscribers. Calling {@code shutdown()} more than once is a no-op.
      */
     public void shutdown() {
+        SubmissionPublisher<T> p;
+        synchronized (lifecycleLock) {
+            if (shutdownInvoked) return;
+            shutdownInvoked = true;
+            p = changesPublisher;
+        }
+        if (p != null) {
+            // Submit close as a terminator task so it runs after any queued
+            // updates have drained — ensures applyUpdate never offers to a
+            // publisher that has already been closed.
+            try {
+                updateExecutor.execute(p::close);
+            } catch (RejectedExecutionException ex) {
+                p.close();
+            }
+        }
         updateExecutor.shutdown();
     }
 
+    /**
+     * Returns a {@link Flow.Publisher} that emits the agent's value after
+     * every successful update. The publisher is hot and per-subscriber:
+     * <ul>
+     *   <li>Subscribers see only changes that occur after they subscribe;
+     *       the current value at subscription time is <em>not</em> 
replayed.</li>
+     *   <li>Each subscriber gets an independent buffer (default
+     *       {@value #DEFAULT_CHANGES_BUFFER} items).</li>
+     *   <li>Slow subscribers drop the most recent value rather than
+     *       blocking the agent's update thread. Values already buffered
+     *       are delivered in order; only newly-offered values that cannot
+     *       fit are discarded.</li>
+     *   <li>Closes (signals {@code onComplete}) when {@link #shutdown()} is
+     *       called. If {@code changes()} is first called after
+     *       {@code shutdown()}, the returned publisher is already closed
+     *       and subscribers receive {@code onComplete} immediately.</li>
+     * </ul>
+     * <p>
+     * Typical use:
+     * <pre>{@code
+     * for await (newValue in agent.changes()) {
+     *     log.info "Agent value is now {}", newValue
+     * }
+     * }</pre>
+     *
+     * @return a hot publisher of state transitions
+     * @since 6.0.0
+     */
+    public Flow.Publisher<T> changes() {
+        SubmissionPublisher<T> p = changesPublisher;
+        if (p == null) {
+            synchronized (lifecycleLock) {
+                p = changesPublisher;
+                if (p == null) {
+                    p = new SubmissionPublisher<>(
+                            AsyncSupport.getExecutor(), 
DEFAULT_CHANGES_BUFFER);
+                    if (shutdownInvoked) p.close();
+                    changesPublisher = p;
+                }
+            }
+        }
+        return p;
+    }
+
     @Override
     public String toString() {
         return "Agent[" + get() + "]";
     }
 
     private T applyUpdate(Function<T, T> updateFn) {
         rwLock.writeLock().lock();
+        T newValue;
         try {
             value = updateFn.apply(value);
-            return value;
+            newValue = value;
         } finally {
             rwLock.writeLock().unlock();
         }
+        SubmissionPublisher<T> p = changesPublisher;
+        if (p != null) {
+            try {
+                // Non-blocking offer: drop on slow subscribers rather than
+                // back-pressuring the agent's serialised update loop.
+                p.offer(newValue, (sub, item) -> false);
+            } catch (IllegalStateException ignore) {
+                // Publisher was closed concurrently (changes() created
+                // after shutdown() marked the flag). Drop this emission.
+            }

Review Comment:
   Agent.changes() is documented to drop emissions for slow subscribers (rather 
than backpressuring the update loop), but the test suite here only exercises 
the fast-subscriber path. Add a regression test that creates a 
slow/never-requesting subscriber and asserts that agent updates continue to 
apply (and that some emissions are dropped) to lock in the intended semantics.



##########
subprojects/groovy-http-builder/src/test/groovy/groovy/http/MultiExchangeTickerEndToEndTest.groovy:
##########
@@ -0,0 +1,261 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package groovy.http
+
+import com.sun.net.httpserver.HttpServer
+import groovy.concurrent.Agent
+import groovy.concurrent.AsyncChannel
+import groovy.concurrent.BroadcastChannel
+import groovy.concurrent.ChannelClosedException
+import groovy.json.JsonSlurper
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.Flow
+import java.util.concurrent.TimeUnit
+
+import static org.apache.groovy.runtime.async.AsyncSupport.await
+
+/**
+ * End-to-end integration of {@link HttpStreamResult#bodyAsLinePublisher()},
+ * {@link BroadcastChannel#asPublisher()} and {@link Agent#changes()}.
+ * <p>
+ * Three mock exchanges stream NDJSON ticks over chunked HTTP. The pipeline
+ * fans them into a single {@link AsyncChannel}, updates a per-symbol stats
+ * {@link Agent}, and routes threshold-crossing alerts through a broadcast
+ * topic. Two independent consumers — one on the agent's change stream, one
+ * on the alert broadcast — verify the wiring end-to-end.
+ *
+ * <pre>
+ *      binance (NDJSON)     coinbase (NDJSON)     kraken (NDJSON)
+ *            │                    │                     │
+ *            ▼                    ▼                     ▼
+ *      getStreamAsync        getStreamAsync       getStreamAsync
+ *            │                    │                     │
+ *            ╰── bodyAsLinePublisher() (Flow.Publisher&lt;String&gt;) ──╮
+ *                                                                  │
+ *                              for await (line in publisher)       │ ← 
FlowPublisherAdapter
+ *                                          │                       │
+ *                                          ▼                       │
+ *                              AsyncChannel&lt;Map&gt; (unified) ◀───────╯
+ *                                          │
+ *                                          ▼
+ *                              aggregator: stats.send { … }
+ *                                          │
+ *                                          ▼
+ *                       Agent&lt;Map&lt;String, Map&gt;&gt; (per-symbol stats)
+ *                          │                            │
+ *                          │                 if price &gt;= threshold
+ *                          ▼                            ▼
+ *                    agent.changes()        BroadcastChannel&lt;String&gt;
+ *                    Flow.Publisher                    │
+ *                          │              ╭────────────┴─────────────╮
+ *                          │              │                          │
+ *                          ▼              ▼                          ▼
+ *                  Flow.Subscriber   asPublisher()              subscribe()
+ *                  stateSnapshots    Flow.Subscriber             for await
+ *                                    alertsViaPublisher          alertsViaIter
+ * </pre>
+ */
+class MultiExchangeTickerEndToEndTest {
+
+    private final List<HttpServer> servers = []
+
+    @BeforeEach
+    void setup() {
+        // 3 exchanges: each emits a fixed sequence of ticks over chunked HTTP.
+        servers << buildExchange('binance',  [50000, 50100, 50250, 50180])
+        servers << buildExchange('coinbase', [50050, 50120, 50300, 50220])
+        servers << buildExchange('kraken',   [50010, 50090, 50260, 50190])
+    }
+
+    @AfterEach
+    void teardown() {
+        servers.each { it.stop(0) }
+    }
+
+    @Test
+    void aggregatesTicksAcrossExchangesAndPublishesAlerts() {
+        var stats = Agent.create([:].withDefault { [count: 0, last: 0d, max: 
0d] })
+        var alertTopic = BroadcastChannel.create()
+        var unified = AsyncChannel.<Map>create(64)
+
+        // Subscribers register FIRST so no early signals are dropped:
+        // SubmissionPublisher drops items with no subscribers, and
+        // BroadcastChannel.subscribe() misses pre-subscription broadcasts.
+
+        var stateSnapshots = [].asSynchronized()
+        var sawSomeUpdates = new CountDownLatch(3)
+        agentChangesSubscriber(stats, stateSnapshots, sawSomeUpdates)
+
+        var alertPublisher = alertTopic.asPublisher()
+        var alertsViaPublisher = [].asSynchronized()
+        var alertsDone = new CountDownLatch(1)
+        alertPublisher.subscribe(new Flow.Subscriber<String>() {
+            @Override void onSubscribe(Flow.Subscription s) { 
s.request(Long.MAX_VALUE) }
+            @Override void onNext(String alert) { alertsViaPublisher << alert }
+            @Override void onError(Throwable t) { alertsDone.countDown() }
+            @Override void onComplete() { alertsDone.countDown() }
+        })

Review Comment:
   Several subscribers intentionally ignore onError (empty implementation). If 
the publisher bridge fails unexpectedly, the test can still pass while silently 
losing signals. Consider capturing the Throwable and failing the test (or 
counting down a latch + rethrow/assert after await) to avoid false positives.



##########
src/test/groovy/groovy/concurrent/BroadcastChannelAsPublisherTest.groovy:
##########
@@ -0,0 +1,203 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package groovy.concurrent
+
+import org.junit.jupiter.api.Test
+
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.Flow
+import java.util.concurrent.TimeUnit
+
+import static groovy.test.GroovyAssert.assertScript
+
+final class BroadcastChannelAsPublisherTest {
+
+    @Test
+    void deliversValuesToSingleSubscriber() {
+        BroadcastChannel<String> broadcast = BroadcastChannel.create()
+        Flow.Publisher<String> publisher = broadcast.asPublisher()
+
+        List<String> received = []
+        CountDownLatch done = new CountDownLatch(1)
+
+        publisher.subscribe(new Flow.Subscriber<String>() {
+            @Override void onSubscribe(Flow.Subscription s) { 
s.request(Long.MAX_VALUE) }
+            @Override void onNext(String item) { received << item }
+            @Override void onError(Throwable t) { done.countDown() }
+            @Override void onComplete() { done.countDown() }
+        })
+
+        Thread.sleep(50)  // give subscriber time to wire up
+        broadcast.send('a')
+        broadcast.send('b')
+        broadcast.send('c')

Review Comment:
   These tests rely on Thread.sleep(...) to "let subscriber time to wire up". 
This is timing-sensitive and can be flaky on slower/loaded CI. Prefer a 
CountDownLatch/CompletableFuture that is completed in onSubscribe (or first 
onNext) and await that instead of sleeping.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to