[
https://issues.apache.org/jira/browse/GROOVY-11953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18076968#comment-18076968
]
ASF GitHub Bot commented on GROOVY-11953:
-----------------------------------------
Copilot commented on code in PR #2483:
URL: https://github.com/apache/groovy/pull/2483#discussion_r3158549838
##########
src/test/groovy/groovy/concurrent/BroadcastChannelAsPublisherTest.groovy:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package groovy.concurrent
+
+import org.junit.jupiter.api.Test
+
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.Flow
+import java.util.concurrent.TimeUnit
+
+import static groovy.test.GroovyAssert.assertScript
+
+final class BroadcastChannelAsPublisherTest {
+
+ @Test
+ void deliversValuesToSingleSubscriber() {
+ BroadcastChannel<String> broadcast = BroadcastChannel.create()
+ Flow.Publisher<String> publisher = broadcast.asPublisher()
+
+ List<String> received = []
+ CountDownLatch done = new CountDownLatch(1)
+
+ publisher.subscribe(new Flow.Subscriber<String>() {
+ @Override void onSubscribe(Flow.Subscription s) {
s.request(Long.MAX_VALUE) }
+ @Override void onNext(String item) { received << item }
+ @Override void onError(Throwable t) { done.countDown() }
+ @Override void onComplete() { done.countDown() }
+ })
+
+ Thread.sleep(50) // give subscriber time to wire up
+ broadcast.send('a')
Review Comment:
The `Thread.sleep(50)` calls intended to "give subscriber time to wire up"
make these tests timing-dependent and slower than necessary. Since
`subscribe(...)` invokes `onSubscribe` synchronously, you can usually remove
the sleep or replace it with a deterministic latch/condition (e.g., wait until
you have the `Subscription`, or until `broadcast.subscriberCount` reflects the
subscription) before producing values.
##########
src/main/java/groovy/concurrent/BroadcastChannel.java:
##########
@@ -140,4 +144,149 @@ public boolean isClosed() {
public int getSubscriberCount() {
return subscribers.size();
}
+
+ /**
+ * Returns a {@link Flow.Publisher} view of this broadcast channel. Each
+ * call to {@link Flow.Publisher#subscribe(Flow.Subscriber)} on the
returned
+ * publisher creates a new {@link AsyncChannel} subscriber under the hood,
+ * draining values to the downstream subscriber according to its requested
+ * demand.
+ * <p>
+ * Semantics:
+ * <ul>
+ * <li>Cold per-subscribe binding: each subscription starts seeing values
+ * from the moment it subscribes (consistent with
+ * {@link #subscribe()}).</li>
+ * <li>Backpressure: respects {@code request(n)}; the worker blocks the
+ * broadcast send when no demand exists (sender-side
backpressure).</li>
+ * <li>Cancellation: closes this subscriber's channel and removes it
+ * from the broadcast's subscriber set.</li>
+ * <li>Completion: signals {@code onComplete} when the broadcast channel
+ * is closed and the per-subscriber buffer drained.</li>
+ * </ul>
+ * <p>
+ * <b>Backpressure policy (important).</b> This bridge uses lossless,
+ * sender-gated backpressure: {@link #send(Object)} awaits delivery to
+ * every live subscriber, and each per-subscriber channel has a bounded
+ * buffer (default 16). A subscriber that never calls {@code request(n)},
+ * or that requests slowly, will fill its buffer; once full, the
+ * subscriber's channel suspends its backing {@code send}, which in turn
+ * stalls {@code BroadcastChannel.send(...)} for <em>all</em>
+ * subscribers. In other words, the slowest subscriber controls producer
+ * throughput.
+ * <p>
+ * This is intentional and matches the point-to-point semantics of
+ * {@link #subscribe()}: values are neither dropped nor reordered. If
+ * you need decoupled per-subscriber policies (drop-newest, drop-oldest,
+ * latest-only, or unbounded buffering), wrap the publisher with a
+ * Reactive Streams operator of your choice, or use a subscriber that
+ * drains promptly with {@code request(Long.MAX_VALUE)}.
+ *
+ * @return a {@code Flow.Publisher} backed by per-subscriber channels
+ * @since 6.0.0
+ */
+ public Flow.Publisher<T> asPublisher() {
+ return new BroadcastFlowPublisher();
+ }
+
+ private final class BroadcastFlowPublisher implements Flow.Publisher<T> {
+ @Override
+ public void subscribe(Flow.Subscriber<? super T> downstream) {
+ Objects.requireNonNull(downstream, "subscriber must not be null");
+ AsyncChannel<T> channel;
+ try {
+ channel = BroadcastChannel.this.subscribe();
+ } catch (ChannelClosedException e) {
+ downstream.onSubscribe(NOOP_SUBSCRIPTION);
+ downstream.onError(e);
+ return;
+ }
+ new BroadcastFlowSubscription<>(BroadcastChannel.this, channel,
downstream).start();
+ }
+ }
+
+ private static final Flow.Subscription NOOP_SUBSCRIPTION = new
Flow.Subscription() {
+ @Override public void request(long n) { }
+ @Override public void cancel() { }
+ };
+
+ /**
+ * Per-subscriber bridge that converts a backing {@link AsyncChannel} into
+ * a Reactive Streams subscription with demand tracking.
+ */
+ private static final class BroadcastFlowSubscription<T> implements
Flow.Subscription {
+ private final BroadcastChannel<T> owner;
+ private final AsyncChannel<T> channel;
+ private final Flow.Subscriber<? super T> subscriber;
+ private final AtomicLong demand = new AtomicLong();
+ private final AtomicBoolean cancelled = new AtomicBoolean();
+ private final Object lock = new Object();
+
+ BroadcastFlowSubscription(BroadcastChannel<T> owner, AsyncChannel<T>
channel, Flow.Subscriber<? super T> subscriber) {
+ this.owner = owner;
+ this.channel = channel;
+ this.subscriber = subscriber;
+ }
+
+ void start() {
+ subscriber.onSubscribe(this);
+ AsyncSupport.getExecutor().execute(this::drain);
+ }
+
+ @Override
+ public void request(long n) {
+ if (n <= 0) {
+ cancel();
+ subscriber.onError(new IllegalArgumentException(
+ "Reactive Streams §3.9: request must be positive, got
" + n));
+ return;
Review Comment:
`request(long n)` calls `subscriber.onError(...)` directly on the caller
thread. If a downstream calls `request(0)`/`request(-1)` from a different
thread while `drain()` is concurrently calling `onNext`, this can violate the
Reactive Streams rule that signals (`onNext`/`onError`/`onComplete`) must be
serialized (non-concurrent). Consider routing this terminal error through the
same single-threaded drain path (e.g., record a terminal error + wake drain) so
all downstream signals come from one place.
##########
src/spec/doc/core-concurrent-actors.adoc:
##########
@@ -93,6 +93,40 @@ inventory.send { state ->
assert await(inventory.getAsync()) == [apples: 20, bananas: 10]
----
+=== Observing state changes
+
+An agent exposes a `Flow.Publisher<T>` of state transitions via
+`changes()`. Each successful update emits the new value to every
+subscriber that has subscribed since its last reset. The stream is
+hot (no replay of prior state), per-subscriber buffered, and closes
+with `onComplete` when `shutdown()` is called:
Review Comment:
The wording "subscribed since its last reset" is misleading here: `Agent`
doesn't expose a reset concept, and `changes()` is purely hot/no-replay
(subscribers see updates after they subscribe). Please rephrase to avoid
implying a resettable/replay boundary that doesn't exist.
```suggestion
subscriber that is already subscribed at the time of the update. The
stream is hot (no replay of prior state), per-subscriber buffered,
and closes with `onComplete` when `shutdown()` is called:
```
##########
src/main/java/groovy/concurrent/Agent.java:
##########
@@ -179,24 +189,100 @@ public Awaitable<T> sendAndGet(Function<T, T> updateFn) {
/**
* Shuts down the agent's update executor. No further updates will
* be accepted. Pending updates are executed before shutdown completes.
+ * The changes publisher (if any subscribers attached) is closed after
+ * pending updates drain, signalling {@code onComplete} to all live
+ * subscribers. Calling {@code shutdown()} more than once is a no-op.
*/
public void shutdown() {
+ SubmissionPublisher<T> p;
+ synchronized (lifecycleLock) {
+ if (shutdownInvoked) return;
+ shutdownInvoked = true;
+ p = changesPublisher;
+ }
+ if (p != null) {
+ // Submit close as a terminator task so it runs after any queued
+ // updates have drained — ensures applyUpdate never offers to a
+ // publisher that has already been closed.
+ try {
+ updateExecutor.execute(p::close);
+ } catch (RejectedExecutionException ex) {
+ p.close();
+ }
+ }
updateExecutor.shutdown();
}
+ /**
+ * Returns a {@link Flow.Publisher} that emits the agent's value after
+ * every successful update. The publisher is hot and per-subscriber:
+ * <ul>
+ * <li>Subscribers see only changes that occur after they subscribe;
+ * the current value at subscription time is <em>not</em>
replayed.</li>
+ * <li>Each subscriber gets an independent buffer (default
+ * {@value #DEFAULT_CHANGES_BUFFER} items).</li>
+ * <li>Slow subscribers drop the most recent value rather than
+ * blocking the agent's update thread. Values already buffered
+ * are delivered in order; only newly-offered values that cannot
+ * fit are discarded.</li>
+ * <li>Closes (signals {@code onComplete}) when {@link #shutdown()} is
+ * called. If {@code changes()} is first called after
+ * {@code shutdown()}, the returned publisher is already closed
+ * and subscribers receive {@code onComplete} immediately.</li>
+ * </ul>
+ * <p>
+ * Typical use:
+ * <pre>{@code
+ * for await (newValue in agent.changes()) {
+ * log.info "Agent value is now {}", newValue
+ * }
+ * }</pre>
+ *
+ * @return a hot publisher of state transitions
+ * @since 6.0.0
+ */
+ public Flow.Publisher<T> changes() {
+ SubmissionPublisher<T> p = changesPublisher;
+ if (p == null) {
+ synchronized (lifecycleLock) {
+ p = changesPublisher;
+ if (p == null) {
+ p = new SubmissionPublisher<>(
+ AsyncSupport.getExecutor(),
DEFAULT_CHANGES_BUFFER);
+ if (shutdownInvoked) p.close();
+ changesPublisher = p;
+ }
+ }
+ }
+ return p;
+ }
+
@Override
public String toString() {
return "Agent[" + get() + "]";
}
private T applyUpdate(Function<T, T> updateFn) {
rwLock.writeLock().lock();
+ T newValue;
try {
value = updateFn.apply(value);
- return value;
+ newValue = value;
} finally {
rwLock.writeLock().unlock();
}
+ SubmissionPublisher<T> p = changesPublisher;
+ if (p != null) {
+ try {
+ // Non-blocking offer: drop on slow subscribers rather than
+ // back-pressuring the agent's serialised update loop.
+ p.offer(newValue, (sub, item) -> false);
+ } catch (IllegalStateException ignore) {
+ // Publisher was closed concurrently (changes() created
+ // after shutdown() marked the flag). Drop this emission.
+ }
Review Comment:
The inline comment says this `offer` is used to "drop on slow subscribers",
but the `onDrop` predicate always returns `false`, and then
`IllegalStateException` is swallowed. This makes the drop behavior
implicit/exception-driven and risks dropping emissions more broadly than
intended. Consider using an explicit `onDrop` policy that matches the
documented semantics (e.g., return the value that indicates dropping for the
lagging subscriber) and avoid relying on catching `IllegalStateException` for
normal flow control.
##########
src/test/groovy/groovy/concurrent/AgentChangesTest.groovy:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package groovy.concurrent
+
+import org.junit.jupiter.api.Test
+
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.Flow
+import java.util.concurrent.TimeUnit
+
+import static groovy.test.GroovyAssert.assertScript
+
+final class AgentChangesTest {
+
+ @Test
+ void emitsValueAfterEachUpdate() {
+ Agent<Integer> agent = Agent.create(0)
+ try {
+ List<Integer> received = Collections.synchronizedList([])
+ CountDownLatch sawThree = new CountDownLatch(3)
+
+ agent.changes().subscribe(new Flow.Subscriber<Integer>() {
+ @Override void onSubscribe(Flow.Subscription s) {
s.request(Long.MAX_VALUE) }
+ @Override void onNext(Integer item) { received << item;
sawThree.countDown() }
+ @Override void onError(Throwable t) {}
+ @Override void onComplete() {}
+ })
+ Thread.sleep(50) // let subscription register
+
Review Comment:
Several tests use `Thread.sleep(50)` to "let subscription register" after
calling `agent.changes().subscribe(...)`. Since the subscription handshake
(`onSubscribe`) happens during `subscribe`, these sleeps are typically
unnecessary and can introduce avoidable timing sensitivity; prefer a
latch/condition tied to the subscription (or remove the sleep if not needed).
##########
src/main/java/groovy/concurrent/BroadcastChannel.java:
##########
@@ -140,4 +144,149 @@ public boolean isClosed() {
public int getSubscriberCount() {
return subscribers.size();
}
+
+ /**
+ * Returns a {@link Flow.Publisher} view of this broadcast channel. Each
+ * call to {@link Flow.Publisher#subscribe(Flow.Subscriber)} on the
returned
+ * publisher creates a new {@link AsyncChannel} subscriber under the hood,
+ * draining values to the downstream subscriber according to its requested
+ * demand.
+ * <p>
+ * Semantics:
+ * <ul>
+ * <li>Cold per-subscribe binding: each subscription starts seeing values
+ * from the moment it subscribes (consistent with
+ * {@link #subscribe()}).</li>
+ * <li>Backpressure: respects {@code request(n)}; the worker blocks the
+ * broadcast send when no demand exists (sender-side
backpressure).</li>
+ * <li>Cancellation: closes this subscriber's channel and removes it
+ * from the broadcast's subscriber set.</li>
+ * <li>Completion: signals {@code onComplete} when the broadcast channel
+ * is closed and the per-subscriber buffer drained.</li>
+ * </ul>
+ * <p>
+ * <b>Backpressure policy (important).</b> This bridge uses lossless,
+ * sender-gated backpressure: {@link #send(Object)} awaits delivery to
+ * every live subscriber, and each per-subscriber channel has a bounded
+ * buffer (default 16). A subscriber that never calls {@code request(n)},
+ * or that requests slowly, will fill its buffer; once full, the
+ * subscriber's channel suspends its backing {@code send}, which in turn
+ * stalls {@code BroadcastChannel.send(...)} for <em>all</em>
+ * subscribers. In other words, the slowest subscriber controls producer
+ * throughput.
+ * <p>
+ * This is intentional and matches the point-to-point semantics of
+ * {@link #subscribe()}: values are neither dropped nor reordered. If
+ * you need decoupled per-subscriber policies (drop-newest, drop-oldest,
+ * latest-only, or unbounded buffering), wrap the publisher with a
+ * Reactive Streams operator of your choice, or use a subscriber that
+ * drains promptly with {@code request(Long.MAX_VALUE)}.
+ *
+ * @return a {@code Flow.Publisher} backed by per-subscriber channels
+ * @since 6.0.0
+ */
+ public Flow.Publisher<T> asPublisher() {
+ return new BroadcastFlowPublisher();
+ }
+
+ private final class BroadcastFlowPublisher implements Flow.Publisher<T> {
+ @Override
+ public void subscribe(Flow.Subscriber<? super T> downstream) {
+ Objects.requireNonNull(downstream, "subscriber must not be null");
+ AsyncChannel<T> channel;
+ try {
+ channel = BroadcastChannel.this.subscribe();
+ } catch (ChannelClosedException e) {
+ downstream.onSubscribe(NOOP_SUBSCRIPTION);
+ downstream.onError(e);
+ return;
+ }
+ new BroadcastFlowSubscription<>(BroadcastChannel.this, channel,
downstream).start();
+ }
+ }
+
+ private static final Flow.Subscription NOOP_SUBSCRIPTION = new
Flow.Subscription() {
+ @Override public void request(long n) { }
+ @Override public void cancel() { }
+ };
+
+ /**
+ * Per-subscriber bridge that converts a backing {@link AsyncChannel} into
+ * a Reactive Streams subscription with demand tracking.
+ */
+ private static final class BroadcastFlowSubscription<T> implements
Flow.Subscription {
+ private final BroadcastChannel<T> owner;
+ private final AsyncChannel<T> channel;
+ private final Flow.Subscriber<? super T> subscriber;
+ private final AtomicLong demand = new AtomicLong();
+ private final AtomicBoolean cancelled = new AtomicBoolean();
+ private final Object lock = new Object();
+
+ BroadcastFlowSubscription(BroadcastChannel<T> owner, AsyncChannel<T>
channel, Flow.Subscriber<? super T> subscriber) {
+ this.owner = owner;
+ this.channel = channel;
+ this.subscriber = subscriber;
+ }
+
+ void start() {
+ subscriber.onSubscribe(this);
+ AsyncSupport.getExecutor().execute(this::drain);
+ }
+
+ @Override
+ public void request(long n) {
+ if (n <= 0) {
+ cancel();
+ subscriber.onError(new IllegalArgumentException(
+ "Reactive Streams §3.9: request must be positive, got
" + n));
+ return;
+ }
+ long prev, next;
+ do {
+ prev = demand.get();
+ next = prev + n;
+ if (next < 0) next = Long.MAX_VALUE; // saturate on overflow
+ } while (!demand.compareAndSet(prev, next));
+ synchronized (lock) { lock.notifyAll(); }
+ }
+
+ @Override
+ public void cancel() {
+ if (!cancelled.compareAndSet(false, true)) return;
+ owner.subscribers.remove(channel);
+ channel.close();
+ synchronized (lock) { lock.notifyAll(); }
+ }
+
+ private void drain() {
+ try {
+ while (!cancelled.get()) {
+ if (demand.get() == 0) {
+ synchronized (lock) {
+ while (demand.get() == 0 && !cancelled.get()) {
+ lock.wait();
Review Comment:
In the drain loop, terminal completion is effectively backpressured: when
`demand == 0` the worker waits on `lock`, so a subscriber that never calls
`request(n)` will never observe `onComplete` after the broadcast/channel is
closed (and the drain task can remain parked indefinitely). Reactive Streams
terminal signals must not depend on demand. Consider checking
`channel.isClosed()`/`channel.getBufferedSize()` while waiting (or otherwise
unblocking on close) and emitting `onComplete` once the channel is
closed+drained even with zero demand.
```suggestion
if (channel.isClosed() &&
channel.getBufferedSize() == 0) {
subscriber.onComplete();
return;
}
lock.wait(100L);
```
##########
subprojects/groovy-http-builder/src/test/groovy/groovy/http/MultiExchangeTickerEndToEndTest.groovy:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package groovy.http
+
+import com.sun.net.httpserver.HttpServer
+import groovy.concurrent.Agent
+import groovy.concurrent.AsyncChannel
+import groovy.concurrent.BroadcastChannel
+import groovy.concurrent.ChannelClosedException
+import groovy.json.JsonSlurper
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.Flow
+import java.util.concurrent.TimeUnit
+
+import static org.apache.groovy.runtime.async.AsyncSupport.await
+
+/**
+ * End-to-end integration of {@link HttpStreamResult#bodyAsLinePublisher()},
+ * {@link BroadcastChannel#asPublisher()} and {@link Agent#changes()}.
+ * <p>
+ * Three mock exchanges stream NDJSON ticks over chunked HTTP. The pipeline
+ * fans them into a single {@link AsyncChannel}, updates a per-symbol stats
+ * {@link Agent}, and routes threshold-crossing alerts through a broadcast
+ * topic. Two independent consumers — one on the agent's change stream, one
+ * on the alert broadcast — verify the wiring end-to-end.
+ *
+ * <pre>
+ * binance (NDJSON) coinbase (NDJSON) kraken (NDJSON)
+ * │ │ │
+ * ▼ ▼ ▼
+ * getStreamAsync getStreamAsync getStreamAsync
+ * │ │ │
+ * ╰── bodyAsLinePublisher() (Flow.Publisher<String>) ──╮
+ * │
+ * for await (line in publisher) │ ←
FlowPublisherAdapter
+ * │ │
+ * ▼ │
+ * AsyncChannel<Map> (unified) ◀───────╯
+ * │
+ * ▼
+ * aggregator: stats.send { … }
+ * │
+ * ▼
+ * Agent<Map<String, Map>> (per-symbol stats)
+ * │ │
+ * │ if price >= threshold
+ * ▼ ▼
+ * agent.changes() BroadcastChannel<String>
+ * Flow.Publisher │
+ * │ ╭────────────┴─────────────╮
+ * │ │ │
+ * ▼ ▼ ▼
+ * Flow.Subscriber asPublisher() subscribe()
+ * stateSnapshots Flow.Subscriber for await
Review Comment:
The class-level diagram/comment says the broadcast Iterable consumer uses
`for await`, but the test code actually iterates the `AsyncChannel` with a
regular `for (a in alertChannel)`. Either update the diagram or use `for await`
in the code to keep the end-to-end narrative consistent.
```suggestion
* stateSnapshots Flow.Subscriber for (a in
alertChannel)
```
> Publisher Mesh Logic for async with GPars
> -----------------------------------------
>
> Key: GROOVY-11953
> URL: https://issues.apache.org/jira/browse/GROOVY-11953
> Project: Groovy
> Issue Type: New Feature
> Reporter: Paul King
> Assignee: Paul King
> Priority: Major
>
> Follows on from GROOVY-11952
--
This message was sent by Atlassian Jira
(v8.20.10#820010)