[ 
https://issues.apache.org/jira/browse/GROOVY-12033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18083883#comment-18083883
 ] 

ASF GitHub Bot commented on GROOVY-12033:
-----------------------------------------

Copilot commented on code in PR #2555:
URL: https://github.com/apache/groovy/pull/2555#discussion_r3311939401


##########
src/main/java/groovy/concurrent/ActorContext.java:
##########
@@ -0,0 +1,234 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package groovy.concurrent;
+
+import java.time.Duration;
+
+/**
+ * Context handle passed to context-aware actor handlers. Provides access
+ * to the executing actor itself and to the per-dispatch capabilities
+ * needed to express FSM-style actors: behavior swaps via
+ * {@link #become(ReactorHandler) become(...)}, message deferral via
+ * {@link #stash()} / {@link #unstashAll()}, and timed self-sends via
+ * {@link #scheduleOnce(Object, Duration) scheduleOnce(...)} /
+ * {@link #scheduleAtFixedRate(Object, Duration, Duration) 
scheduleAtFixedRate(...)}.
+ * <p>
+ * A context is scoped to a single actor and is only valid for use during
+ * a handler invocation (including a context-aware {@code onError}
+ * callback). Calls to its mutating methods from outside that window —
+ * for example from a captured reference invoked on another thread —
+ * throw {@link IllegalStateException}.
+ *
+ * @param <T> the actor's message type
+ * @see Actor
+ * @since 6.0.0
+ */
+public interface ActorContext<T> {
+
+    /**
+     * Returns the actor whose handler is currently executing.
+     */
+    Actor<T> self();
+
+    /**
+     * Replaces the handler used to process subsequent messages on a
+     * reactor actor. The swap takes effect on the next message — the
+     * current handler invocation completes normally, including binding
+     * any {@code sendAndGet} reply and firing {@code onError} on failure.
+     * <p>
+     * The new handler may declare a different reply type than the original;
+     * {@code sendAndGet} callers receive the new handler's return value.
+     * <p>
+     * Messages already queued at the moment of the swap, and messages sent
+     * by other threads that have not yet observed the swap, are dispatched
+     * to the new handler. The new handler is responsible for tolerating any
+     * message its predecessor could have received — typically by including
+     * a default branch that ignores or rejects unexpected messages, or by
+     * deferring them with {@link #stash()} for later replay.
+     *
+     * @param newHandler the replacement reactor handler
+     * @param <R>        the new reactor's reply type
+     * @throws UnsupportedOperationException if this actor is stateful, or
+     *         if this {@code ActorContext} implementation does not support
+     *         behavior swaps
+     * @throws IllegalStateException if called outside a handler dispatch
+     *         or from a thread other than the actor's worker thread
+     * @throws NullPointerException  if {@code newHandler} is null
+     * @since 6.0.0
+     */
+    default <R> void become(ReactorHandler<T, R> newHandler) {
+        throw new UnsupportedOperationException(
+                "become(ReactorHandler) requires a reactor actor");
+    }
+
+    /**
+     * Replaces the handler used to process subsequent messages on a
+     * stateful actor. The current state value is preserved verbatim and
+     * passed unchanged to the new handler. The swap takes effect on the
+     * next message.
+     * <p>
+     * The state type {@code S} is unchecked at the swap site: if the new
+     * handler's expected state type is incompatible with the actor's
+     * current state, a {@link ClassCastException} is thrown when the next
+     * message is dispatched.
+     *
+     * @param newHandler the replacement stateful handler
+     * @param <S>        the state type expected by the new handler
+     * @throws UnsupportedOperationException if this actor is a reactor, or
+     *         if this {@code ActorContext} implementation does not support
+     *         behavior swaps
+     * @throws IllegalStateException if called outside a handler dispatch
+     *         or from a thread other than the actor's worker thread
+     * @throws NullPointerException  if {@code newHandler} is null
+     * @since 6.0.0
+     */
+    default <S> void become(StatefulHandler<S, T> newHandler) {
+        throw new UnsupportedOperationException(
+                "become(StatefulHandler) requires a stateful actor");
+    }
+
+    /**
+     * Defers the message currently being processed. The message is moved
+     * out of the dispatch path: any {@code sendAndGet} reply remains
+     * unbound, any state change computed by the current handler is
+     * discarded, and the message is re-delivered when {@link #unstashAll()}
+     * is later called.
+     * <p>
+     * Calling {@code stash()} more than once during a single handler
+     * invocation is idempotent — the message is stashed once. If the
+     * handler subsequently throws, the stash is rolled back and the failure
+     * is reported normally (reply bound to error, {@code onError} fires).
+     * A context-aware {@code onError} callback may itself call
+     * {@code stash()} to defer the failed message for later retry.
+     * <p>
+     * Stashed messages do <em>not</em> count against the configured
+     * mailbox bound — the bound applies to the queue of pending sends,
+     * not to messages held in the stash.
+     * <p>
+     * <b>Warning — the stash buffer is unbounded by default.</b> An
+     * actor that stashes messages from a source whose volume you do not
+     * control (network input, external clients, untrusted callers) can
+     * grow the stash without limit and exhaust the JVM heap if the
+     * phase transition that would call {@link #unstashAll()} never
+     * arrives. For any such actor, configure a bound and overflow
+     * policy at construction time via
+     * {@link ActorOptions#withStashBound(int, ActorOptions.StashOverflow)}.
+     * The three policies are {@code FAIL} (this method throws),
+     * {@code DROP_OLDEST} (evicts the oldest stashed message, binding
+     * its reply to {@link IllegalStateException}), and {@code REJECT}
+     * (binds the current message's reply to {@link IllegalStateException}
+     * and does not stash it).
+     * <p>
+     * If {@link Actor#stop()} is invoked while messages are stashed, the
+     * stashed messages are rejected: any {@code sendAndGet} reply is bound
+     * to an {@link IllegalStateException} and fire-and-forget stashed
+     * messages are discarded.
+     *
+     * @throws IllegalStateException if called outside a handler dispatch
+     *         or from a thread other than the actor's worker thread
+     * @throws UnsupportedOperationException if this {@code ActorContext}
+     *         implementation does not support stash
+     * @since 6.0.0
+     */
+    default void stash() {
+        throw new UnsupportedOperationException(
+                "This ActorContext implementation does not support stash");
+    }
+
+    /**
+     * Re-injects all stashed messages at the head of the mailbox in the
+     * order they were originally stashed (FIFO). Subsequent dispatches
+     * will see the unstashed messages before any messages that other
+     * senders have queued in the meantime.
+     * <p>
+     * No-op if no messages are stashed.
+     *
+     * @throws IllegalStateException if called outside a handler dispatch
+     *         or from a thread other than the actor's worker thread
+     * @throws UnsupportedOperationException if this {@code ActorContext}
+     *         implementation does not support stash
+     * @since 6.0.0
+     */
+    default void unstashAll() {
+        throw new UnsupportedOperationException(
+                "This ActorContext implementation does not support 
unstashAll");
+    }
+
+    /**
+     * Schedules a one-shot self-send: the given message is delivered to
+     * this actor after the given delay, via the same dispatch path as
+     * {@link Actor#send(Object)} (mailbox bound respected,
+     * {@code onError} fires on handler failure, etc.).
+     * <p>
+     * The returned {@link Cancellable} can be used to call off the send
+     * before it fires. On {@link Actor#stop()}, all outstanding scheduled
+     * timers (one-shot and recurring) created via this context are
+     * cancelled automatically.
+     * <p>
+     * Timer firings race with the actor's lifecycle: a send that
+     * arrives after the actor has stopped is silently dropped (the
+     * implementation catches the {@link IllegalStateException} that
+     * {@code send} would throw). Combining a bounded mailbox using
+     * {@link ActorOptions.Overflow#BLOCK} with timers is discouraged —
+     * a full mailbox will block the shared scheduler thread; prefer
+     * {@link ActorOptions.Overflow#FAIL FAIL} or
+     * {@link ActorOptions.Overflow#DROP_NEWEST DROP_NEWEST} for

Review Comment:
   The Javadoc warns that combining timers with a bounded BLOCK mailbox will 
"block the shared scheduler thread", but `DefaultActor` now explicitly offloads 
the timer’s `send` onto `AsyncSupport.getExecutor()` to avoid blocking the 
scheduler thread. The warning should be updated to match the actual behavior 
(the scheduler thread won’t block, though executor threads may).
   



##########
src/spec/doc/core-concurrent-actors.adoc:
##########
@@ -219,19 +219,98 @@ assert balance == 70.0
 account.stop()
 ----
 
+[[actors-options]]
+== Construction options
+
+Actors take an optional `ActorOptions` configuring the mailbox, the
+executor, and a handful of opt-in behaviours. The builder is
+value-based; each `with*` method returns a new configuration.
+
+[source,groovy]
+----
+import groovy.concurrent.Actor
+import groovy.concurrent.ActorOptions
+
+def options = ActorOptions.DEFAULTS
+    .withBoundedMailbox(1000, ActorOptions.Overflow.BLOCK)
+
+def actor = Actor.reactor(handler, options)
+----
+
+=== Bounded mailbox
+
+By default the mailbox is unbounded. For backpressure — or to cap
+memory when producers can outrun the actor — configure a capacity and
+an overflow policy:
+
+[source,groovy]
+----
+import groovy.concurrent.ActorOptions.Overflow
+
+// BLOCK — sender blocks until capacity is free
+ActorOptions.DEFAULTS.withBoundedMailbox(100, Overflow.BLOCK)
+
+// FAIL — send throws IllegalStateException when the mailbox is full
+ActorOptions.DEFAULTS.withBoundedMailbox(100, Overflow.FAIL)
+
+// DROP_NEWEST — the incoming message is silently dropped; for
+// sendAndGet, the returned Awaitable binds to IllegalStateException
+ActorOptions.DEFAULTS.withBoundedMailbox(100, Overflow.DROP_NEWEST)
+----
+
+A handler that calls `ctx.self().send(...)` on a full `BLOCK` mailbox
+would deadlock (the handler is the actor's only consumer). The actor
+detects this and fails fast with `IllegalStateException` rather than
+parking the worker thread.
+
+=== Per-actor executor
+
+Actors default to a shared async executor (virtual threads on
+JDK 21+). For workload isolation, hand the actor its own executor:
+
+[source,groovy]
+----
+import java.util.concurrent.Executors
+
+def pool = Executors.newSingleThreadExecutor()
+def actor = Actor.reactor(handler,
+    ActorOptions.DEFAULTS.withExecutor(pool))
+----
+
+Other options — `withStashBound` and `withCurrentSelf` — are covered
+in the <<actors-fsm,FSM section>> where they're directly used.
+
 [[actors-lifecycle]]
 == Lifecycle
 
-Both actors and agents support lifecycle management:
+Both actors and agents support lifecycle management. An actor has a
+three-state lifecycle:
+
+[cols="1,1,3"]
+|===
+|`isActive()` |`isTerminated()` |Meaning
+
+|`true`  |`false` |Accepting new sends and processing them.
+|`false` |`false` |Draining — `stop()` was called, no new sends are
+accepted, but already-queued messages are still being processed.
+|`false` |`true`  |Terminated — the worker thread has exited; all
+queued messages are processed and any stashed `sendAndGet` replies
+have been rejected.
+|===
 
 [source,groovy]
 ----
 def actor = Actor.reactor { it }
 assert actor.isActive()
+assert !actor.isTerminated()
+
+actor.stop()                                  // flips isActive() to false 
immediately
+assert !actor.isActive()                       // refuses new sends from this 
point on
 
-actor.stop()       // graceful: processes remaining messages then exits
-Thread.sleep(50)
-assert !actor.isActive()
+// The worker may still be draining queued messages. Poll for terminated
+// when you actually need to be sure the actor has finished shutting down.
+while (!actor.isTerminated()) Thread.sleep(10)

Review Comment:
   The lifecycle example states that after `actor.stop()` the actor "refuses 
new sends from this point on". In `DefaultActor`, sends that race with `stop()` 
can still be enqueued (it explicitly avoids treating POISON as an unconditional 
terminator to honor race-late sends). Consider softening this wording to 
acknowledge that sends already in flight may still land, while sends that 
observe `isActive()==false` will fail.



##########
src/main/java/org/apache/groovy/runtime/async/DefaultActor.java:
##########
@@ -89,79 +190,610 @@ public boolean isActive() {
     }
 
     @Override
-    @SuppressWarnings("unchecked")
+    public boolean isTerminated() {
+        return terminated;
+    }
+
+    @Override
     public void stop() {
         if (!active) return;
         active = false;
-        // Poison pill signals the processing loop to exit after draining
-        queue.add(new Envelope<>(POISON, null));
+        // Cancel any outstanding scheduled timers so they do not fire
+        // into a stopped actor (one-shots would get IllegalStateException
+        // from send and be silently caught, but periodic timers would
+        // keep firing forever holding refs to this actor).
+        for (ScheduledFuture<?> f : pendingTimers) {
+            f.cancel(false);
+        }
+        pendingTimers.clear();
+        // Best-effort wake-up. The deque is internally unbounded
+        // (capacity is gated by the semaphore), so offerLast always
+        // succeeds — but the POISON itself is just a marker; the
+        // post-message drain check below is what actually terminates
+        // the loop.
+        queue.offerLast(new Envelope<>(POISON, null));
     }
 
-    // --

> groovy.concurrent.Actor: pre-GA hardening — Stop sentinel, error callback, 
> bounded mailbox, per-actor pool
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: GROOVY-12033
>                 URL: https://issues.apache.org/jira/browse/GROOVY-12033
>             Project: Groovy
>          Issue Type: Improvement
>            Reporter: Paul King
>            Assignee: Paul King
>            Priority: Major
>             Fix For: 6.0.0-alpha-2
>
>
> h2. Summary
> Add four production-readiness improvements to {{groovy.concurrent.Actor}} 
> introduced in alpha-1:
> # {{Actor.Stop}} return-value sentinel for in-handler self-termination
> # {{onError}} callback so handler exceptions are no longer silently swallowed 
> for fire-and-forget {{send}}
> # Bounded mailbox with overflow strategy ({{BLOCK}} / {{DROP_NEWEST}} / 
> {{FAIL}})
> # Per-actor executor selection
> All changes are additive and backward compatible. Existing factory methods 
> and the existing {{ActorTest}} pass unchanged.
> h2. Public API additions
> h3. {{groovy.concurrent.Actor}}
> * {{Object Stop}} — public sentinel constant. A handler returning 
> {{Actor.Stop}} causes the actor to stop gracefully after the current message 
> (semantics of {{stop()}}: queued messages still drain). For {{sendAndGet}} 
> callers the reply is bound with the {{Stop}} sentinel itself; detect by 
> reference equality.
> * {{default Actor<T> onError(BiFunction<Throwable, ? super T, ?> handler)}} — 
> registers a handler invoked when message processing throws. Receives 
> {{(throwable, message)}}. Return value is a control signal: {{Actor.Stop}} 
> stops the actor, anything else continues. Default implementation throws 
> {{UnsupportedOperationException}}; {{DefaultActor}} overrides.
> * {{static Actor<T> reactor(Function<T,R> handler, ActorOptions options)}} — 
> new overload.
> * {{static Actor<T> stateful(S initial, BiFunction<S,T,S> handler, 
> ActorOptions options)}} — new overload.
> h3. New: {{groovy.concurrent.ActorOptions}} (record)
> {code:java}
> public record ActorOptions(int mailboxCapacity, Overflow overflow, Executor 
> executor) {
>     public enum Overflow { BLOCK, DROP_NEWEST, FAIL }
>     public static final ActorOptions DEFAULTS;
>     public ActorOptions withBoundedMailbox(int capacity, Overflow strategy);
>     public ActorOptions withExecutor(Executor executor);
>     public boolean isBounded();
> }
> {code}
> * {{mailboxCapacity == 0}} → unbounded (current behaviour).
> * {{executor == null}} → uses {{AsyncSupport.getExecutor()}} (current 
> behaviour).
> * Canonical constructor rejects negative capacity and null overflow.
> h2. Behavioural changes in {{org.apache.groovy.runtime.async.DefaultActor}}
> * Constructor now takes {{ActorOptions}}; queue is 
> {{LinkedBlockingQueue(capacity)}} when bounded.
> * {{send(T)}} routes through an {{enqueue(...)}} helper that honours the 
> overflow policy:
> ** {{BLOCK}} → {{queue.put}} (back-pressures the sending thread; on interrupt 
> restores the flag and throws a wrapping {{RuntimeException}}).
> ** {{DROP_NEWEST}} → {{queue.offer}}; on overflow the message is silently 
> dropped, and {{sendAndGet}} replies bind an {{IllegalStateException}} so 
> awaiters don't hang.
> ** {{FAIL}} → {{queue.offer}}; on overflow throws {{IllegalStateException}}. 
> {{sendAndGet}} additionally binds the reply with the same exception before 
> rethrowing.
> * {{processLoop}} checks {{result == Actor.Stop}} after the handler runs and 
> calls {{stop()}} (graceful — pending messages drain).
> * {{processLoop}}'s catch block now invokes the registered {{onError}} 
> handler if present, in addition to binding the failure on the reply for 
> {{sendAndGet}}. If the {{onError}} handler returns {{Actor.Stop}} the actor 
> stops. Exceptions from the {{onError}} handler itself are caught and 
> discarded so the processing loop cannot be destabilised.
> * {{StatefulProcessor.process}} preserves the prior state when the handler 
> returns {{Actor.Stop}}, so any messages queued behind the Stop-trigger still 
> observe real state during drain.
> * {{stop()}} now offers the poison pill on a possibly-bounded queue, falling 
> back to {{put}} so termination always succeeds.
> h2. Files touched
> || File || Change ||
> | {{src/main/java/groovy/concurrent/Actor.java}} | Modified — Stop sentinel, 
> onError default, 2 factory overloads, doc updates on {{send}} |
> | {{src/main/java/groovy/concurrent/ActorOptions.java}} | New |
> | {{src/main/java/org/apache/groovy/runtime/async/DefaultActor.java}} | 
> Modified — see behavioural changes above |
> | {{src/test/groovy/groovy/concurrent/ActorTest.groovy}} | Extended — 13 new 
> tests (see below) |
> h2. Tests
> 13 new tests, all passing alongside the existing 13:
> * {{testStatefulSelfStopsOnStopSentinel}}, 
> {{testReactorSelfStopsOnStopSentinel}}
> * {{testStopSentinelDrainsQueuedMessages}} — confirms FIFO drain after Stop
> * {{testStatePreservedAcrossStopSentinel}} — confirms StatefulProcessor 
> doesn't overwrite state with Stop
> * {{testOnErrorFiresForFireAndForgetException}}
> * {{testOnErrorAlsoFiresForSendAndGet}} — confirms both the {{Awaitable}} 
> failure path and the {{onError}} callback fire
> * {{testOnErrorReturningStopTerminatesActor}}
> * {{testOnErrorHandlerExceptionIsSwallowed}} — confirms a throwing 
> {{onError}} doesn't break the loop
> * {{testBoundedMailboxFailOverflowThrows}}
> * {{testBoundedMailboxDropNewest}}
> * {{testBoundedMailboxDropNewestReplyBindsError}} — overflow on 
> {{sendAndGet}} surfaces via the {{Awaitable}}
> * {{testBoundedMailboxBlockBackpressures}} — confirms the sender thread 
> actually parks until a slot frees
> * {{testPerActorExecutorIsUsed}} — confirms the handler runs on the supplied 
> executor's thread
> * {{testActorOptionsRejectsNegativeCapacity}}, 
> {{testActorOptionsWithBoundedMailboxRejectsZero}}
> Full suite of 135 tests across the 18 {{groovy.concurrent.*}} test classes 
> still passes.
> h2. Out of scope / deferred
> Deferred to a future ticket (proposed for Groovy 7):
> * Idle / receive timeout ({{react(timeout)}} from GPars, 
> {{setReceiveTimeout}} from Pekko)
> * Restart-with-backoff supervision policy
> * Per-actor {{withTimers}} scheduling hub
> * Stash / unstash
> * {{become}}-style behaviour swap
> Not adopted (out of scope for {{groovy.concurrent}} entirely): actor 
> hierarchy with supervision trees, remote actors, persistence, sharding. These 
> are Pekko's domain; {{groovy.concurrent.Actor}} is for serialising in-process 
> mutable state.
> h2. Compatibility
> * Source compatible: no existing signature on the {{Actor}} interface or 
> {{DefaultActor}} factory methods changed.
> * Binary compatible for all existing call sites; existing 2-arg factories 
> route through {{ActorOptions.DEFAULTS}}.
> * {{onError}} is a {{default}} method, so existing implementors of {{Actor}} 
> compile unchanged (calls to {{onError}} on a non-{{DefaultActor}} 
> implementation throw {{UnsupportedOperationException}}).
> h2. Notes for follow-up
> While adding tests, two Groovy 6 {{await}} parser-sugar quirks surfaced and 
> are worth filing separately (not part of this change):
> # {{await\(x).is(y)}} parses as {{await(x.is(y))}} — the postfix chain after 
> {{await(...)}} is consumed into its argument list.
> # {{await\(x)}} as a standalone statement is rejected with "Modifiers or 
> return type is required" — {{await}} requires expression context.
> Both worked around in the tests by binding {{def v = await(...)}} first.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to