[
https://issues.apache.org/jira/browse/GROOVY-12033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18083263#comment-18083263
]
ASF GitHub Bot commented on GROOVY-12033:
-----------------------------------------
Copilot commented on code in PR #2555:
URL: https://github.com/apache/groovy/pull/2555#discussion_r3297473012
##########
src/main/java/org/apache/groovy/runtime/async/DefaultActor.java:
##########
@@ -89,33 +146,109 @@ public boolean isActive() {
}
@Override
- @SuppressWarnings("unchecked")
public void stop() {
if (!active) return;
active = false;
- // Poison pill signals the processing loop to exit after draining
- queue.add(new Envelope<>(POISON, null));
+ // Best-effort wake-up for an idle take(). If the queue is full
+ // (bounded mailbox), offer() fails and that is fine: the loop is
+ // not blocked, and the post-message !active check below will
+ // terminate processing once the queue drains. Never block here —
+ // stop() may be called from the actor's own thread, and put() on
+ // a full bounded queue would deadlock against the only consumer.
+ queue.offer(new Envelope<>(POISON, null));
+ }
+
+ @Override
+ public Actor<T> onError(BiConsumer<Throwable, ? super T> handler) {
+ Objects.requireNonNull(handler, "handler must not be null");
+ // Adapt the context-free consumer to the internally-uniform shape.
+ this.errorHandler = (ctx, t, msg) -> handler.accept(t, msg);
+ return this;
+ }
+
+ @Override
+ public Actor<T> onError(TriConsumer<ActorContext<T>, Throwable, ? super T>
handler) {
+ Objects.requireNonNull(handler, "handler must not be null");
+ this.errorHandler = handler;
+ return this;
}
// --
> groovy.concurrent.Actor: pre-GA hardening — Stop sentinel, error callback,
> bounded mailbox, per-actor pool
> ----------------------------------------------------------------------------------------------------------
>
> Key: GROOVY-12033
> URL: https://issues.apache.org/jira/browse/GROOVY-12033
> Project: Groovy
> Issue Type: Improvement
> Reporter: Paul King
> Assignee: Paul King
> Priority: Major
> Fix For: 6.0.0-alpha-2
>
>
> h2. Summary
> Add four production-readiness improvements to {{groovy.concurrent.Actor}}
> introduced in alpha-1:
> # {{Actor.Stop}} return-value sentinel for in-handler self-termination
> # {{onError}} callback so handler exceptions are no longer silently swallowed
> for fire-and-forget {{send}}
> # Bounded mailbox with overflow strategy ({{BLOCK}} / {{DROP_NEWEST}} /
> {{FAIL}})
> # Per-actor executor selection
> All changes are additive and backward compatible. Existing factory methods
> and the existing {{ActorTest}} pass unchanged.
> h2. Public API additions
> h3. {{groovy.concurrent.Actor}}
> * {{Object Stop}} — public sentinel constant. A handler returning
> {{Actor.Stop}} causes the actor to stop gracefully after the current message
> (semantics of {{stop()}}: queued messages still drain). For {{sendAndGet}}
> callers the reply is bound with the {{Stop}} sentinel itself; detect by
> reference equality.
> * {{default Actor<T> onError(BiFunction<Throwable, ? super T, ?> handler)}} —
> registers a handler invoked when message processing throws. Receives
> {{(throwable, message)}}. Return value is a control signal: {{Actor.Stop}}
> stops the actor, anything else continues. Default implementation throws
> {{UnsupportedOperationException}}; {{DefaultActor}} overrides.
> * {{static Actor<T> reactor(Function<T,R> handler, ActorOptions options)}} —
> new overload.
> * {{static Actor<T> stateful(S initial, BiFunction<S,T,S> handler,
> ActorOptions options)}} — new overload.
> h3. New: {{groovy.concurrent.ActorOptions}} (record)
> {code:java}
> public record ActorOptions(int mailboxCapacity, Overflow overflow, Executor
> executor) {
> public enum Overflow { BLOCK, DROP_NEWEST, FAIL }
> public static final ActorOptions DEFAULTS;
> public ActorOptions withBoundedMailbox(int capacity, Overflow strategy);
> public ActorOptions withExecutor(Executor executor);
> public boolean isBounded();
> }
> {code}
> * {{mailboxCapacity == 0}} → unbounded (current behaviour).
> * {{executor == null}} → uses {{AsyncSupport.getExecutor()}} (current
> behaviour).
> * Canonical constructor rejects negative capacity and null overflow.
> h2. Behavioural changes in {{org.apache.groovy.runtime.async.DefaultActor}}
> * Constructor now takes {{ActorOptions}}; queue is
> {{LinkedBlockingQueue(capacity)}} when bounded.
> * {{send(T)}} routes through an {{enqueue(...)}} helper that honours the
> overflow policy:
> ** {{BLOCK}} → {{queue.put}} (back-pressures the sending thread; on interrupt
> restores the flag and throws a wrapping {{RuntimeException}}).
> ** {{DROP_NEWEST}} → {{queue.offer}}; on overflow the message is silently
> dropped, and {{sendAndGet}} replies bind an {{IllegalStateException}} so
> awaiters don't hang.
> ** {{FAIL}} → {{queue.offer}}; on overflow throws {{IllegalStateException}}.
> {{sendAndGet}} additionally binds the reply with the same exception before
> rethrowing.
> * {{processLoop}} checks {{result == Actor.Stop}} after the handler runs and
> calls {{stop()}} (graceful — pending messages drain).
> * {{processLoop}}'s catch block now invokes the registered {{onError}}
> handler if present, in addition to binding the failure on the reply for
> {{sendAndGet}}. If the {{onError}} handler returns {{Actor.Stop}} the actor
> stops. Exceptions from the {{onError}} handler itself are caught and
> discarded so the processing loop cannot be destabilised.
> * {{StatefulProcessor.process}} preserves the prior state when the handler
> returns {{Actor.Stop}}, so any messages queued behind the Stop-trigger still
> observe real state during drain.
> * {{stop()}} now offers the poison pill on a possibly-bounded queue, falling
> back to {{put}} so termination always succeeds.
> h2. Files touched
> || File || Change ||
> | {{src/main/java/groovy/concurrent/Actor.java}} | Modified — Stop sentinel,
> onError default, 2 factory overloads, doc updates on {{send}} |
> | {{src/main/java/groovy/concurrent/ActorOptions.java}} | New |
> | {{src/main/java/org/apache/groovy/runtime/async/DefaultActor.java}} |
> Modified — see behavioural changes above |
> | {{src/test/groovy/groovy/concurrent/ActorTest.groovy}} | Extended — 13 new
> tests (see below) |
> h2. Tests
> 13 new tests, all passing alongside the existing 13:
> * {{testStatefulSelfStopsOnStopSentinel}},
> {{testReactorSelfStopsOnStopSentinel}}
> * {{testStopSentinelDrainsQueuedMessages}} — confirms FIFO drain after Stop
> * {{testStatePreservedAcrossStopSentinel}} — confirms StatefulProcessor
> doesn't overwrite state with Stop
> * {{testOnErrorFiresForFireAndForgetException}}
> * {{testOnErrorAlsoFiresForSendAndGet}} — confirms both the {{Awaitable}}
> failure path and the {{onError}} callback fire
> * {{testOnErrorReturningStopTerminatesActor}}
> * {{testOnErrorHandlerExceptionIsSwallowed}} — confirms a throwing
> {{onError}} doesn't break the loop
> * {{testBoundedMailboxFailOverflowThrows}}
> * {{testBoundedMailboxDropNewest}}
> * {{testBoundedMailboxDropNewestReplyBindsError}} — overflow on
> {{sendAndGet}} surfaces via the {{Awaitable}}
> * {{testBoundedMailboxBlockBackpressures}} — confirms the sender thread
> actually parks until a slot frees
> * {{testPerActorExecutorIsUsed}} — confirms the handler runs on the supplied
> executor's thread
> * {{testActorOptionsRejectsNegativeCapacity}},
> {{testActorOptionsWithBoundedMailboxRejectsZero}}
> Full suite of 135 tests across the 18 {{groovy.concurrent.*}} test classes
> still passes.
> h2. Out of scope / deferred
> Deferred to a future ticket (proposed for Groovy 7):
> * Idle / receive timeout ({{react(timeout)}} from GPars,
> {{setReceiveTimeout}} from Pekko)
> * Restart-with-backoff supervision policy
> * Per-actor {{withTimers}} scheduling hub
> * Stash / unstash
> * {{become}}-style behaviour swap
> Not adopted (out of scope for {{groovy.concurrent}} entirely): actor
> hierarchy with supervision trees, remote actors, persistence, sharding. These
> are Pekko's domain; {{groovy.concurrent.Actor}} is for serialising in-process
> mutable state.
> h2. Compatibility
> * Source compatible: no existing signature on the {{Actor}} interface or
> {{DefaultActor}} factory methods changed.
> * Binary compatible for all existing call sites; existing 2-arg factories
> route through {{ActorOptions.DEFAULTS}}.
> * {{onError}} is a {{default}} method, so existing implementors of {{Actor}}
> compile unchanged (calls to {{onError}} on a non-{{DefaultActor}}
> implementation throw {{UnsupportedOperationException}}).
> h2. Notes for follow-up
> While adding tests, two Groovy 6 {{await}} parser-sugar quirks surfaced and
> are worth filing separately (not part of this change):
> # {{await\(x).is(y)}} parses as {{await(x.is(y))}} — the postfix chain after
> {{await(...)}} is consumed into its argument list.
> # {{await\(x)}} as a standalone statement is rejected with "Modifiers or
> return type is required" — {{await}} requires expression context.
> Both worked around in the tests by binding {{def v = await(...)}} first.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)