Copilot commented on code in PR #2555:
URL: https://github.com/apache/groovy/pull/2555#discussion_r3297473012


##########
src/main/java/org/apache/groovy/runtime/async/DefaultActor.java:
##########
@@ -89,33 +146,109 @@ public boolean isActive() {
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public void stop() {
         if (!active) return;
         active = false;
-        // Poison pill signals the processing loop to exit after draining
-        queue.add(new Envelope<>(POISON, null));
+        // Best-effort wake-up for an idle take(). If the queue is full
+        // (bounded mailbox), offer() fails and that is fine: the loop is
+        // not blocked, and the post-message !active check below will
+        // terminate processing once the queue drains. Never block here —
+        // stop() may be called from the actor's own thread, and put() on
+        // a full bounded queue would deadlock against the only consumer.
+        queue.offer(new Envelope<>(POISON, null));
+    }
+
+    @Override
+    public Actor<T> onError(BiConsumer<Throwable, ? super T> handler) {
+        Objects.requireNonNull(handler, "handler must not be null");
+        // Adapt the context-free consumer to the internally-uniform shape.
+        this.errorHandler = (ctx, t, msg) -> handler.accept(t, msg);
+        return this;
+    }
+
+    @Override
+    public Actor<T> onError(TriConsumer<ActorContext<T>, Throwable, ? super T> 
handler) {
+        Objects.requireNonNull(handler, "handler must not be null");
+        this.errorHandler = handler;
+        return this;
     }
 
     // ---- Internal -------------------------------------------------------
 
+    /**
+     * Routes an envelope into the mailbox according to the configured
+     * overflow policy. Callers are responsible for binding any reply
+     * with the thrown exception if needed.
+     */
+    private void enqueue(Envelope<T> envelope) {
+        if (!options.isBounded()) {
+            queue.add(envelope);
+            return;
+        }
+        switch (options.overflow()) {
+            case BLOCK -> {
+                try {
+                    queue.put(envelope);
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    throw new RuntimeException("Interrupted while sending to 
actor", ie);
+                }
+            }
+            case DROP_NEWEST -> {
+                if (!queue.offer(envelope) && envelope.reply != null) {
+                    envelope.reply.bindError(new IllegalStateException(
+                            "mailbox full (capacity " + 
options.mailboxCapacity()
+                                    + "); message dropped"));
+                }
+            }
+            case FAIL -> {
+                if (!queue.offer(envelope)) {
+                    throw new IllegalStateException(
+                            "mailbox full (capacity " + 
options.mailboxCapacity() + ")");
+                }
+            }
+        }
+    }
+
     @SuppressWarnings("unchecked")
     private void processLoop() {
         while (true) {
             try {
                 Envelope<T> envelope = queue.take();
                 if (envelope.message == POISON) return;
 

Review Comment:
   `processLoop` returns immediately when it dequeues the POISON envelope. If 
an in-flight `send`/`sendAndGet` passes the `active` check before `stop()` 
flips it, it can still enqueue *after* the poison pill, and those messages (and 
any `sendAndGet` replies) would never be processed/bound. Consider treating 
POISON as a wake-up marker (skip processing it, then rely on the existing 
`!active && queue.isEmpty()` termination check) rather than an unconditional 
`return`.



##########
src/main/java/groovy/concurrent/Actor.java:
##########
@@ -108,6 +123,91 @@ default void close() {
         stop();
     }
 
+    /**
+     * Registers a handler invoked when the message processor throws.
+     * <p>
+     * Fire-and-forget {@link #send} otherwise has no way to surface a
+     * handler exception; this hook is the supported way to log, record
+     * metrics for, or react to those failures. For {@link #sendAndGet}
+     * the failure is still reported through the returned {@code Awaitable};
+     * the {@code onError} handler runs in addition.
+     * <p>
+     * To stop the actor from inside an error handler, call
+     * {@link Actor#currentSelf() Actor.<T>currentSelf().stop()}, or use
+     * the context-aware overload {@link #onError(TriConsumer)}.

Review Comment:
   The `onError(BiConsumer)` Javadoc suggests using 
`Actor.currentSelf().stop()` from inside an error handler, but `currentSelf()` 
only works when the actor was constructed with 
`ActorOptions.withCurrentSelf(true)`. Either note this requirement here or 
steer users toward the context-aware `onError(TriConsumer)` overload for 
self-stop.
   



##########
src/main/java/org/apache/groovy/runtime/async/DefaultActor.java:
##########
@@ -89,33 +146,109 @@ public boolean isActive() {
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public void stop() {
         if (!active) return;
         active = false;
-        // Poison pill signals the processing loop to exit after draining
-        queue.add(new Envelope<>(POISON, null));
+        // Best-effort wake-up for an idle take(). If the queue is full
+        // (bounded mailbox), offer() fails and that is fine: the loop is
+        // not blocked, and the post-message !active check below will
+        // terminate processing once the queue drains. Never block here —
+        // stop() may be called from the actor's own thread, and put() on
+        // a full bounded queue would deadlock against the only consumer.
+        queue.offer(new Envelope<>(POISON, null));
+    }
+
+    @Override
+    public Actor<T> onError(BiConsumer<Throwable, ? super T> handler) {
+        Objects.requireNonNull(handler, "handler must not be null");
+        // Adapt the context-free consumer to the internally-uniform shape.
+        this.errorHandler = (ctx, t, msg) -> handler.accept(t, msg);
+        return this;
+    }
+
+    @Override
+    public Actor<T> onError(TriConsumer<ActorContext<T>, Throwable, ? super T> 
handler) {
+        Objects.requireNonNull(handler, "handler must not be null");
+        this.errorHandler = handler;
+        return this;
     }
 
     // ---- Internal -------------------------------------------------------
 
+    /**
+     * Routes an envelope into the mailbox according to the configured
+     * overflow policy. Callers are responsible for binding any reply
+     * with the thrown exception if needed.
+     */
+    private void enqueue(Envelope<T> envelope) {
+        if (!options.isBounded()) {
+            queue.add(envelope);
+            return;
+        }
+        switch (options.overflow()) {
+            case BLOCK -> {
+                try {
+                    queue.put(envelope);
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    throw new RuntimeException("Interrupted while sending to 
actor", ie);

Review Comment:
   For bounded mailboxes with `Overflow.BLOCK`, `enqueue` uses 
`queue.put(envelope)`. If a handler calls `send`/`sendAndGet` on its own actor 
while the mailbox is full, this will block the actor thread and deadlock (the 
only consumer is blocked inside the handler). Consider detecting calls from the 
actor's processing thread and failing fast (or using a non-blocking policy) to 
avoid self-deadlock.
   



##########
src/main/java/groovy/concurrent/Actor.java:
##########
@@ -72,7 +85,9 @@ public interface Actor<T> extends AutoCloseable {
      * asynchronously. Fire-and-forget — no reply is expected.
      *
      * @param message the message to send
-     * @throws IllegalStateException if the actor has been stopped
+     * @throws IllegalStateException if the actor has been stopped, or if
+     *         the mailbox is bounded with {@link ActorOptions.Overflow#FAIL}
+     *         and is full
      */
     void send(T message);
 

Review Comment:
   The `send` Javadoc now documents bounded-mailbox overflow behavior, but 
`sendAndGet` is also affected by `ActorOptions` (it may block for `BLOCK`, 
throw for `FAIL`, or complete exceptionally for `DROP_NEWEST`). Consider 
updating the `sendAndGet` Javadoc to describe these behaviors so callers aren’t 
surprised by blocking/exception semantics when enabling bounded mailboxes.



##########
src/test/groovy/groovy/concurrent/ActorTest.groovy:
##########
@@ -226,4 +226,381 @@ final class ActorTest {
             calc.stop()
         '''
     }
+
+    // === Self-stop from handler (GROOVY-12033) ===
+
+    @Test
+    void testStatefulSelfStopViaContext() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorContext
+
+            def max = 3
+            def bot = Actor.stateful(0) { ActorContext ctx, int count, msg ->
+                def next = count + 1
+                if (next >= max) ctx.self().stop()
+                next
+            }
+
+            bot.send('one')
+            bot.send('two')
+            bot.send('three')
+            for (int i = 0; i < 20 && bot.isActive(); i++) Thread.sleep(25)
+            assert !bot.isActive()
+        '''
+    }
+
+    @Test
+    void testReactorSelfStopViaCurrentSelf() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorOptions
+            import java.util.concurrent.atomic.AtomicInteger
+
+            def n = new AtomicInteger()
+            def options = ActorOptions.DEFAULTS.withCurrentSelf(true)
+            def actor = Actor.reactor({ msg ->
+                if (n.incrementAndGet() == 2) Actor.currentSelf().stop()
+                msg
+            }, options)
+
+            assert await(actor.sendAndGet('first')) == 'first'
+            assert await(actor.sendAndGet('second')) == 'second'
+            for (int i = 0; i < 20 && actor.isActive(); i++) Thread.sleep(25)
+            assert !actor.isActive()
+        '''
+    }
+
+    @Test
+    void testCurrentSelfThrowsWhenNotEnabled() {
+        // currentSelf() is opt-in: an actor built with the default options
+        // (no withCurrentSelf(true)) does not publish the thread-local,
+        // so the call throws even though we are inside a handler.
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.atomic.AtomicReference
+
+            def captured = new AtomicReference<Throwable>()
+            def actor = Actor.reactor { msg ->
+                try { Actor.currentSelf().stop() }
+                catch (Throwable t) { captured.set(t) }
+                msg
+            }
+            assert await(actor.sendAndGet('x')) == 'x'
+            assert captured.get() instanceof IllegalStateException
+            assert captured.get().message.contains('withCurrentSelf')
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testSelfStopDrainsQueuedMessages() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorContext
+            import java.util.concurrent.CopyOnWriteArrayList
+            import java.util.concurrent.CountDownLatch
+
+            def log = new CopyOnWriteArrayList<Integer>()
+            // Gate the handler until all three sends are queued, otherwise
+            // the self-stop from msg 2 can flip active=false before
+            // sendAndGet(3) runs, causing an IllegalStateException.
+            def gate = new CountDownLatch(1)
+            def actor = Actor.stateful(0) { ActorContext ctx, int seen, 
Integer msg ->
+                gate.await()
+                log << msg
+                if (msg == 2) ctx.self().stop()
+                seen + 1
+            }
+            def r1 = actor.sendAndGet(1)
+            def r2 = actor.sendAndGet(2)
+            def r3 = actor.sendAndGet(3)
+            gate.countDown()
+            def v1 = await(r1)
+            def v2 = await(r2)
+            def v3 = await(r3)
+            for (int i = 0; i < 20 && actor.isActive(); i++) Thread.sleep(25)
+            assert log == [1, 2, 3]
+            assert !actor.isActive()
+        '''
+    }
+
+    @Test
+    void testStatePreservedAcrossSelfStop() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorContext
+            import java.util.concurrent.CountDownLatch
+
+            def gate = new CountDownLatch(1)
+            def actor = Actor.stateful(100) { ActorContext ctx, int state, msg 
->
+                gate.await()
+                if (msg == 'stop') { ctx.self().stop(); return state }
+                state + 1
+            }
+            def r1 = actor.sendAndGet('inc')
+            def r2 = actor.sendAndGet('stop')
+            def r3 = actor.sendAndGet('inc')      // queued before stop drains
+            gate.countDown()
+            def v1 = await(r1)
+            def v2 = await(r2)
+            def v3 = await(r3)
+            assert v1 == 101
+            // Reply for the stop-trigger message is the handler's return 
value.
+            assert v2 == 101
+            // Third message saw preserved state, then incremented.
+            assert v3 == 102
+        '''
+    }
+
+    @Test
+    void testCurrentSelfOutsideHandlerThrows() {
+        shouldFail(IllegalStateException, '''
+            import groovy.concurrent.Actor
+            Actor.currentSelf()
+        ''')
+    }
+
+    // === onError callback (GROOVY-12033) ===
+
+    @Test
+    void testOnErrorFiresForFireAndForgetException() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.CountDownLatch
+            import java.util.concurrent.atomic.AtomicReference
+
+            def latch = new CountDownLatch(1)
+            def captured = new AtomicReference<List>()
+            def actor = Actor.reactor { msg -> throw new 
RuntimeException("bang: $msg") }
+            actor.onError { Throwable t, msg ->
+                captured.set([t.message, msg])
+                latch.countDown()
+            }
+            actor.send('payload')
+            assert latch.await(2, java.util.concurrent.TimeUnit.SECONDS)
+            assert captured.get() == ['bang: payload', 'payload']
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testOnErrorAlsoFiresForSendAndGet() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.atomic.AtomicInteger
+
+            def fired = new AtomicInteger()
+            def actor = Actor.reactor { throw new RuntimeException('boom') }
+            actor.onError { Throwable t, msg -> fired.incrementAndGet() }
+
+            try { await(actor.sendAndGet('x')); assert false } catch 
(RuntimeException expected) { }
+            for (int i = 0; i < 20 && fired.get() == 0; i++) Thread.sleep(25)
+            assert fired.get() == 1
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testOnErrorContextCanStopActor() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorContext
+
+            def actor = Actor.reactor { throw new RuntimeException('die') }
+            actor.onError { ActorContext ctx, Throwable t, msg -> 
ctx.self().stop() }
+            actor.send('trigger')
+            for (int i = 0; i < 20 && actor.isActive(); i++) Thread.sleep(25)
+            assert !actor.isActive()
+        '''
+    }
+
+    @Test
+    void testOnErrorHandlerExceptionIsSwallowed() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.atomic.AtomicInteger
+
+            def processed = new AtomicInteger()
+            def actor = Actor.reactor { msg ->
+                processed.incrementAndGet()
+                if (msg == 'fail') throw new RuntimeException('first')
+                msg
+            }
+            actor.onError { Throwable t, msg -> throw new 
RuntimeException('handler also failed') }
+
+            actor.send('fail')
+            // Subsequent messages should still be processed even though the
+            // error handler itself threw.
+            assert await(actor.sendAndGet('ok')) == 'ok'
+            assert processed.get() == 2
+            actor.stop()
+        '''
+    }
+
+    // === Bounded mailbox (GROOVY-12033) ===
+
+    @Test
+    void testBoundedMailboxFailOverflowThrows() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorOptions
+            import java.util.concurrent.CountDownLatch
+
+            def started = new CountDownLatch(1)
+            def hold = new CountDownLatch(1)
+            def options = ActorOptions.DEFAULTS.withBoundedMailbox(2, 
ActorOptions.Overflow.FAIL)
+            def actor = Actor.reactor({ msg -> started.countDown(); 
hold.await(); msg }, options)
+
+            actor.send('first')     // taken by the handler, which blocks on 
hold
+            started.await()         // 'first' is off the queue when this 
returns
+            actor.send('a')         // queued (1/2)

Review Comment:
   `started.await()` has no timeout here. If the actor never reaches the 
handler (regression, scheduler issue), the test can hang indefinitely. Use a 
timed await and assert it returned true to make failures deterministic.



##########
src/test/groovy/groovy/concurrent/ActorTest.groovy:
##########
@@ -226,4 +226,381 @@ final class ActorTest {
             calc.stop()
         '''
     }
+
+    // === Self-stop from handler (GROOVY-12033) ===
+
+    @Test
+    void testStatefulSelfStopViaContext() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorContext
+
+            def max = 3
+            def bot = Actor.stateful(0) { ActorContext ctx, int count, msg ->
+                def next = count + 1
+                if (next >= max) ctx.self().stop()
+                next
+            }
+
+            bot.send('one')
+            bot.send('two')
+            bot.send('three')
+            for (int i = 0; i < 20 && bot.isActive(); i++) Thread.sleep(25)
+            assert !bot.isActive()
+        '''
+    }
+
+    @Test
+    void testReactorSelfStopViaCurrentSelf() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorOptions
+            import java.util.concurrent.atomic.AtomicInteger
+
+            def n = new AtomicInteger()
+            def options = ActorOptions.DEFAULTS.withCurrentSelf(true)
+            def actor = Actor.reactor({ msg ->
+                if (n.incrementAndGet() == 2) Actor.currentSelf().stop()
+                msg
+            }, options)
+
+            assert await(actor.sendAndGet('first')) == 'first'
+            assert await(actor.sendAndGet('second')) == 'second'
+            for (int i = 0; i < 20 && actor.isActive(); i++) Thread.sleep(25)
+            assert !actor.isActive()
+        '''
+    }
+
+    @Test
+    void testCurrentSelfThrowsWhenNotEnabled() {
+        // currentSelf() is opt-in: an actor built with the default options
+        // (no withCurrentSelf(true)) does not publish the thread-local,
+        // so the call throws even though we are inside a handler.
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.atomic.AtomicReference
+
+            def captured = new AtomicReference<Throwable>()
+            def actor = Actor.reactor { msg ->
+                try { Actor.currentSelf().stop() }
+                catch (Throwable t) { captured.set(t) }
+                msg
+            }
+            assert await(actor.sendAndGet('x')) == 'x'
+            assert captured.get() instanceof IllegalStateException
+            assert captured.get().message.contains('withCurrentSelf')
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testSelfStopDrainsQueuedMessages() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorContext
+            import java.util.concurrent.CopyOnWriteArrayList
+            import java.util.concurrent.CountDownLatch
+
+            def log = new CopyOnWriteArrayList<Integer>()
+            // Gate the handler until all three sends are queued, otherwise
+            // the self-stop from msg 2 can flip active=false before
+            // sendAndGet(3) runs, causing an IllegalStateException.
+            def gate = new CountDownLatch(1)
+            def actor = Actor.stateful(0) { ActorContext ctx, int seen, 
Integer msg ->
+                gate.await()
+                log << msg
+                if (msg == 2) ctx.self().stop()
+                seen + 1
+            }
+            def r1 = actor.sendAndGet(1)
+            def r2 = actor.sendAndGet(2)
+            def r3 = actor.sendAndGet(3)
+            gate.countDown()
+            def v1 = await(r1)
+            def v2 = await(r2)
+            def v3 = await(r3)
+            for (int i = 0; i < 20 && actor.isActive(); i++) Thread.sleep(25)
+            assert log == [1, 2, 3]
+            assert !actor.isActive()
+        '''
+    }
+
+    @Test
+    void testStatePreservedAcrossSelfStop() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorContext
+            import java.util.concurrent.CountDownLatch
+
+            def gate = new CountDownLatch(1)
+            def actor = Actor.stateful(100) { ActorContext ctx, int state, msg 
->
+                gate.await()
+                if (msg == 'stop') { ctx.self().stop(); return state }
+                state + 1
+            }
+            def r1 = actor.sendAndGet('inc')
+            def r2 = actor.sendAndGet('stop')
+            def r3 = actor.sendAndGet('inc')      // queued before stop drains
+            gate.countDown()
+            def v1 = await(r1)
+            def v2 = await(r2)
+            def v3 = await(r3)
+            assert v1 == 101
+            // Reply for the stop-trigger message is the handler's return 
value.
+            assert v2 == 101
+            // Third message saw preserved state, then incremented.
+            assert v3 == 102
+        '''
+    }
+
+    @Test
+    void testCurrentSelfOutsideHandlerThrows() {
+        shouldFail(IllegalStateException, '''
+            import groovy.concurrent.Actor
+            Actor.currentSelf()
+        ''')
+    }
+
+    // === onError callback (GROOVY-12033) ===
+
+    @Test
+    void testOnErrorFiresForFireAndForgetException() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.CountDownLatch
+            import java.util.concurrent.atomic.AtomicReference
+
+            def latch = new CountDownLatch(1)
+            def captured = new AtomicReference<List>()
+            def actor = Actor.reactor { msg -> throw new 
RuntimeException("bang: $msg") }
+            actor.onError { Throwable t, msg ->
+                captured.set([t.message, msg])
+                latch.countDown()
+            }
+            actor.send('payload')
+            assert latch.await(2, java.util.concurrent.TimeUnit.SECONDS)
+            assert captured.get() == ['bang: payload', 'payload']
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testOnErrorAlsoFiresForSendAndGet() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.atomic.AtomicInteger
+
+            def fired = new AtomicInteger()
+            def actor = Actor.reactor { throw new RuntimeException('boom') }
+            actor.onError { Throwable t, msg -> fired.incrementAndGet() }
+
+            try { await(actor.sendAndGet('x')); assert false } catch 
(RuntimeException expected) { }
+            for (int i = 0; i < 20 && fired.get() == 0; i++) Thread.sleep(25)
+            assert fired.get() == 1
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testOnErrorContextCanStopActor() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorContext
+
+            def actor = Actor.reactor { throw new RuntimeException('die') }
+            actor.onError { ActorContext ctx, Throwable t, msg -> 
ctx.self().stop() }
+            actor.send('trigger')
+            for (int i = 0; i < 20 && actor.isActive(); i++) Thread.sleep(25)
+            assert !actor.isActive()
+        '''
+    }
+
+    @Test
+    void testOnErrorHandlerExceptionIsSwallowed() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.atomic.AtomicInteger
+
+            def processed = new AtomicInteger()
+            def actor = Actor.reactor { msg ->
+                processed.incrementAndGet()
+                if (msg == 'fail') throw new RuntimeException('first')
+                msg
+            }
+            actor.onError { Throwable t, msg -> throw new 
RuntimeException('handler also failed') }
+
+            actor.send('fail')
+            // Subsequent messages should still be processed even though the
+            // error handler itself threw.
+            assert await(actor.sendAndGet('ok')) == 'ok'
+            assert processed.get() == 2
+            actor.stop()
+        '''
+    }
+
+    // === Bounded mailbox (GROOVY-12033) ===
+
+    @Test
+    void testBoundedMailboxFailOverflowThrows() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorOptions
+            import java.util.concurrent.CountDownLatch
+
+            def started = new CountDownLatch(1)
+            def hold = new CountDownLatch(1)
+            def options = ActorOptions.DEFAULTS.withBoundedMailbox(2, 
ActorOptions.Overflow.FAIL)
+            def actor = Actor.reactor({ msg -> started.countDown(); 
hold.await(); msg }, options)
+
+            actor.send('first')     // taken by the handler, which blocks on 
hold
+            started.await()         // 'first' is off the queue when this 
returns
+            actor.send('a')         // queued (1/2)
+            actor.send('b')         // queued (2/2)
+            try {
+                actor.send('c')     // overflow
+                assert false : 'expected IllegalStateException'
+            } catch (IllegalStateException e) {
+                assert e.message.contains('mailbox full')
+            }
+            hold.countDown()
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testBoundedMailboxDropNewest() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorOptions
+            import java.util.concurrent.CountDownLatch
+            import java.util.concurrent.CopyOnWriteArrayList
+
+            def started = new CountDownLatch(1)
+            def hold = new CountDownLatch(1)
+            def seen = new CopyOnWriteArrayList()
+            def options = ActorOptions.DEFAULTS.withBoundedMailbox(2, 
ActorOptions.Overflow.DROP_NEWEST)
+            def actor = Actor.reactor({ msg -> started.countDown(); 
hold.await(); seen << msg; msg }, options)
+
+            actor.send('first')     // currently being processed
+            started.await()
+            actor.send('a')         // queued (1/2)

Review Comment:
   `started.await()` is unbounded here; if the actor never starts processing, 
this test will hang. Prefer `started.await(timeout, unit)` with an assertion.



##########
src/test/groovy/groovy/concurrent/ActorTest.groovy:
##########
@@ -226,4 +226,381 @@ final class ActorTest {
             calc.stop()
         '''
     }
+
+    // === Self-stop from handler (GROOVY-12033) ===
+
+    @Test
+    void testStatefulSelfStopViaContext() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorContext
+
+            def max = 3
+            def bot = Actor.stateful(0) { ActorContext ctx, int count, msg ->
+                def next = count + 1
+                if (next >= max) ctx.self().stop()
+                next
+            }
+
+            bot.send('one')
+            bot.send('two')
+            bot.send('three')
+            for (int i = 0; i < 20 && bot.isActive(); i++) Thread.sleep(25)
+            assert !bot.isActive()
+        '''
+    }
+
+    @Test
+    void testReactorSelfStopViaCurrentSelf() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorOptions
+            import java.util.concurrent.atomic.AtomicInteger
+
+            def n = new AtomicInteger()
+            def options = ActorOptions.DEFAULTS.withCurrentSelf(true)
+            def actor = Actor.reactor({ msg ->
+                if (n.incrementAndGet() == 2) Actor.currentSelf().stop()
+                msg
+            }, options)
+
+            assert await(actor.sendAndGet('first')) == 'first'
+            assert await(actor.sendAndGet('second')) == 'second'
+            for (int i = 0; i < 20 && actor.isActive(); i++) Thread.sleep(25)
+            assert !actor.isActive()
+        '''
+    }
+
+    @Test
+    void testCurrentSelfThrowsWhenNotEnabled() {
+        // currentSelf() is opt-in: an actor built with the default options
+        // (no withCurrentSelf(true)) does not publish the thread-local,
+        // so the call throws even though we are inside a handler.
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.atomic.AtomicReference
+
+            def captured = new AtomicReference<Throwable>()
+            def actor = Actor.reactor { msg ->
+                try { Actor.currentSelf().stop() }
+                catch (Throwable t) { captured.set(t) }
+                msg
+            }
+            assert await(actor.sendAndGet('x')) == 'x'
+            assert captured.get() instanceof IllegalStateException
+            assert captured.get().message.contains('withCurrentSelf')
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testSelfStopDrainsQueuedMessages() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorContext
+            import java.util.concurrent.CopyOnWriteArrayList
+            import java.util.concurrent.CountDownLatch
+
+            def log = new CopyOnWriteArrayList<Integer>()
+            // Gate the handler until all three sends are queued, otherwise
+            // the self-stop from msg 2 can flip active=false before
+            // sendAndGet(3) runs, causing an IllegalStateException.
+            def gate = new CountDownLatch(1)
+            def actor = Actor.stateful(0) { ActorContext ctx, int seen, 
Integer msg ->
+                gate.await()
+                log << msg
+                if (msg == 2) ctx.self().stop()
+                seen + 1
+            }
+            def r1 = actor.sendAndGet(1)
+            def r2 = actor.sendAndGet(2)
+            def r3 = actor.sendAndGet(3)
+            gate.countDown()
+            def v1 = await(r1)
+            def v2 = await(r2)
+            def v3 = await(r3)
+            for (int i = 0; i < 20 && actor.isActive(); i++) Thread.sleep(25)
+            assert log == [1, 2, 3]
+            assert !actor.isActive()
+        '''
+    }
+
+    @Test
+    void testStatePreservedAcrossSelfStop() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorContext
+            import java.util.concurrent.CountDownLatch
+
+            def gate = new CountDownLatch(1)
+            def actor = Actor.stateful(100) { ActorContext ctx, int state, msg 
->
+                gate.await()
+                if (msg == 'stop') { ctx.self().stop(); return state }
+                state + 1

Review Comment:
   `gate.await()` is unbounded inside the handler; if the countdown isn't 
reached the test can hang. A timed await with an assertion would avoid 
indefinite blocking in CI.



##########
src/test/groovy/groovy/concurrent/ActorTest.groovy:
##########
@@ -226,4 +226,381 @@ final class ActorTest {
             calc.stop()
         '''
     }
+
+    // === Self-stop from handler (GROOVY-12033) ===
+
+    @Test
+    void testStatefulSelfStopViaContext() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorContext
+
+            def max = 3
+            def bot = Actor.stateful(0) { ActorContext ctx, int count, msg ->
+                def next = count + 1
+                if (next >= max) ctx.self().stop()
+                next
+            }
+
+            bot.send('one')
+            bot.send('two')
+            bot.send('three')
+            for (int i = 0; i < 20 && bot.isActive(); i++) Thread.sleep(25)
+            assert !bot.isActive()
+        '''
+    }
+
+    @Test
+    void testReactorSelfStopViaCurrentSelf() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorOptions
+            import java.util.concurrent.atomic.AtomicInteger
+
+            def n = new AtomicInteger()
+            def options = ActorOptions.DEFAULTS.withCurrentSelf(true)
+            def actor = Actor.reactor({ msg ->
+                if (n.incrementAndGet() == 2) Actor.currentSelf().stop()
+                msg
+            }, options)
+
+            assert await(actor.sendAndGet('first')) == 'first'
+            assert await(actor.sendAndGet('second')) == 'second'
+            for (int i = 0; i < 20 && actor.isActive(); i++) Thread.sleep(25)
+            assert !actor.isActive()
+        '''
+    }
+
+    @Test
+    void testCurrentSelfThrowsWhenNotEnabled() {
+        // currentSelf() is opt-in: an actor built with the default options
+        // (no withCurrentSelf(true)) does not publish the thread-local,
+        // so the call throws even though we are inside a handler.
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.atomic.AtomicReference
+
+            def captured = new AtomicReference<Throwable>()
+            def actor = Actor.reactor { msg ->
+                try { Actor.currentSelf().stop() }
+                catch (Throwable t) { captured.set(t) }
+                msg
+            }
+            assert await(actor.sendAndGet('x')) == 'x'
+            assert captured.get() instanceof IllegalStateException
+            assert captured.get().message.contains('withCurrentSelf')
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testSelfStopDrainsQueuedMessages() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorContext
+            import java.util.concurrent.CopyOnWriteArrayList
+            import java.util.concurrent.CountDownLatch
+
+            def log = new CopyOnWriteArrayList<Integer>()
+            // Gate the handler until all three sends are queued, otherwise
+            // the self-stop from msg 2 can flip active=false before
+            // sendAndGet(3) runs, causing an IllegalStateException.
+            def gate = new CountDownLatch(1)
+            def actor = Actor.stateful(0) { ActorContext ctx, int seen, 
Integer msg ->
+                gate.await()
+                log << msg
+                if (msg == 2) ctx.self().stop()
+                seen + 1
+            }
+            def r1 = actor.sendAndGet(1)
+            def r2 = actor.sendAndGet(2)
+            def r3 = actor.sendAndGet(3)
+            gate.countDown()
+            def v1 = await(r1)
+            def v2 = await(r2)
+            def v3 = await(r3)
+            for (int i = 0; i < 20 && actor.isActive(); i++) Thread.sleep(25)
+            assert log == [1, 2, 3]
+            assert !actor.isActive()
+        '''
+    }
+
+    @Test
+    void testStatePreservedAcrossSelfStop() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorContext
+            import java.util.concurrent.CountDownLatch
+
+            def gate = new CountDownLatch(1)
+            def actor = Actor.stateful(100) { ActorContext ctx, int state, msg 
->
+                gate.await()
+                if (msg == 'stop') { ctx.self().stop(); return state }
+                state + 1
+            }
+            def r1 = actor.sendAndGet('inc')
+            def r2 = actor.sendAndGet('stop')
+            def r3 = actor.sendAndGet('inc')      // queued before stop drains
+            gate.countDown()
+            def v1 = await(r1)
+            def v2 = await(r2)
+            def v3 = await(r3)
+            assert v1 == 101
+            // Reply for the stop-trigger message is the handler's return 
value.
+            assert v2 == 101
+            // Third message saw preserved state, then incremented.
+            assert v3 == 102
+        '''
+    }
+
+    @Test
+    void testCurrentSelfOutsideHandlerThrows() {
+        shouldFail(IllegalStateException, '''
+            import groovy.concurrent.Actor
+            Actor.currentSelf()
+        ''')
+    }
+
+    // === onError callback (GROOVY-12033) ===
+
+    @Test
+    void testOnErrorFiresForFireAndForgetException() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.CountDownLatch
+            import java.util.concurrent.atomic.AtomicReference
+
+            def latch = new CountDownLatch(1)
+            def captured = new AtomicReference<List>()
+            def actor = Actor.reactor { msg -> throw new 
RuntimeException("bang: $msg") }
+            actor.onError { Throwable t, msg ->
+                captured.set([t.message, msg])
+                latch.countDown()
+            }
+            actor.send('payload')
+            assert latch.await(2, java.util.concurrent.TimeUnit.SECONDS)
+            assert captured.get() == ['bang: payload', 'payload']
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testOnErrorAlsoFiresForSendAndGet() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.atomic.AtomicInteger
+
+            def fired = new AtomicInteger()
+            def actor = Actor.reactor { throw new RuntimeException('boom') }
+            actor.onError { Throwable t, msg -> fired.incrementAndGet() }
+
+            try { await(actor.sendAndGet('x')); assert false } catch 
(RuntimeException expected) { }
+            for (int i = 0; i < 20 && fired.get() == 0; i++) Thread.sleep(25)
+            assert fired.get() == 1
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testOnErrorContextCanStopActor() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorContext
+
+            def actor = Actor.reactor { throw new RuntimeException('die') }
+            actor.onError { ActorContext ctx, Throwable t, msg -> 
ctx.self().stop() }
+            actor.send('trigger')
+            for (int i = 0; i < 20 && actor.isActive(); i++) Thread.sleep(25)
+            assert !actor.isActive()
+        '''
+    }
+
+    @Test
+    void testOnErrorHandlerExceptionIsSwallowed() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.atomic.AtomicInteger
+
+            def processed = new AtomicInteger()
+            def actor = Actor.reactor { msg ->
+                processed.incrementAndGet()
+                if (msg == 'fail') throw new RuntimeException('first')
+                msg
+            }
+            actor.onError { Throwable t, msg -> throw new 
RuntimeException('handler also failed') }
+
+            actor.send('fail')
+            // Subsequent messages should still be processed even though the
+            // error handler itself threw.
+            assert await(actor.sendAndGet('ok')) == 'ok'
+            assert processed.get() == 2
+            actor.stop()
+        '''
+    }
+
+    // === Bounded mailbox (GROOVY-12033) ===
+
+    @Test
+    void testBoundedMailboxFailOverflowThrows() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorOptions
+            import java.util.concurrent.CountDownLatch
+
+            def started = new CountDownLatch(1)
+            def hold = new CountDownLatch(1)
+            def options = ActorOptions.DEFAULTS.withBoundedMailbox(2, 
ActorOptions.Overflow.FAIL)
+            def actor = Actor.reactor({ msg -> started.countDown(); 
hold.await(); msg }, options)
+
+            actor.send('first')     // taken by the handler, which blocks on 
hold
+            started.await()         // 'first' is off the queue when this 
returns
+            actor.send('a')         // queued (1/2)
+            actor.send('b')         // queued (2/2)
+            try {
+                actor.send('c')     // overflow
+                assert false : 'expected IllegalStateException'
+            } catch (IllegalStateException e) {
+                assert e.message.contains('mailbox full')
+            }
+            hold.countDown()
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testBoundedMailboxDropNewest() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorOptions
+            import java.util.concurrent.CountDownLatch
+            import java.util.concurrent.CopyOnWriteArrayList
+
+            def started = new CountDownLatch(1)
+            def hold = new CountDownLatch(1)
+            def seen = new CopyOnWriteArrayList()
+            def options = ActorOptions.DEFAULTS.withBoundedMailbox(2, 
ActorOptions.Overflow.DROP_NEWEST)
+            def actor = Actor.reactor({ msg -> started.countDown(); 
hold.await(); seen << msg; msg }, options)
+
+            actor.send('first')     // currently being processed
+            started.await()
+            actor.send('a')         // queued (1/2)
+            actor.send('b')         // queued (2/2)
+            actor.send('c')         // dropped silently
+            actor.send('d')         // dropped silently
+            hold.countDown()
+            // Wait for the three accepted messages
+            for (int i = 0; i < 40 && seen.size() < 3; i++) Thread.sleep(25)
+            assert seen.toList() == ['first', 'a', 'b']
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testBoundedMailboxDropNewestReplyBindsError() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorOptions
+            import java.util.concurrent.CountDownLatch
+
+            def started = new CountDownLatch(1)
+            def hold = new CountDownLatch(1)
+            def options = ActorOptions.DEFAULTS.withBoundedMailbox(1, 
ActorOptions.Overflow.DROP_NEWEST)
+            def actor = Actor.reactor({ msg -> started.countDown(); 
hold.await(); msg }, options)
+
+            actor.send('busy')         // occupies the handler
+            started.await()
+            actor.send('queued')        // fills the 1-slot queue
+            def dropped = actor.sendAndGet('dropped')  // overflows
+            try {
+                await(dropped)
+                assert false : 'awaiting a dropped sendAndGet should fail'
+            } catch (IllegalStateException e) {
+                assert e.message.contains('dropped')
+            }
+            hold.countDown()
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testBoundedMailboxBlockBackpressures() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorOptions
+            import java.util.concurrent.CountDownLatch
+            import java.util.concurrent.atomic.AtomicLong
+
+            def started = new CountDownLatch(1)
+            def hold = new CountDownLatch(1)
+            def options = ActorOptions.DEFAULTS.withBoundedMailbox(1, 
ActorOptions.Overflow.BLOCK)
+            def actor = Actor.reactor({ msg -> started.countDown(); 
hold.await(); msg }, options)
+
+            actor.send('first')      // being processed
+            started.await()

Review Comment:
   This `started.await()` has no timeout; if the first message isn't picked up, 
the test can block forever. Please switch to a timed await and assert it 
returns true.
   



##########
src/test/groovy/groovy/concurrent/ActorTest.groovy:
##########
@@ -226,4 +226,381 @@ final class ActorTest {
             calc.stop()
         '''
     }
+
+    // === Self-stop from handler (GROOVY-12033) ===
+
+    @Test
+    void testStatefulSelfStopViaContext() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorContext
+
+            def max = 3
+            def bot = Actor.stateful(0) { ActorContext ctx, int count, msg ->
+                def next = count + 1
+                if (next >= max) ctx.self().stop()
+                next
+            }
+
+            bot.send('one')
+            bot.send('two')
+            bot.send('three')
+            for (int i = 0; i < 20 && bot.isActive(); i++) Thread.sleep(25)
+            assert !bot.isActive()
+        '''
+    }
+
+    @Test
+    void testReactorSelfStopViaCurrentSelf() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorOptions
+            import java.util.concurrent.atomic.AtomicInteger
+
+            def n = new AtomicInteger()
+            def options = ActorOptions.DEFAULTS.withCurrentSelf(true)
+            def actor = Actor.reactor({ msg ->
+                if (n.incrementAndGet() == 2) Actor.currentSelf().stop()
+                msg
+            }, options)
+
+            assert await(actor.sendAndGet('first')) == 'first'
+            assert await(actor.sendAndGet('second')) == 'second'
+            for (int i = 0; i < 20 && actor.isActive(); i++) Thread.sleep(25)
+            assert !actor.isActive()
+        '''
+    }
+
+    @Test
+    void testCurrentSelfThrowsWhenNotEnabled() {
+        // currentSelf() is opt-in: an actor built with the default options
+        // (no withCurrentSelf(true)) does not publish the thread-local,
+        // so the call throws even though we are inside a handler.
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.atomic.AtomicReference
+
+            def captured = new AtomicReference<Throwable>()
+            def actor = Actor.reactor { msg ->
+                try { Actor.currentSelf().stop() }
+                catch (Throwable t) { captured.set(t) }
+                msg
+            }
+            assert await(actor.sendAndGet('x')) == 'x'
+            assert captured.get() instanceof IllegalStateException
+            assert captured.get().message.contains('withCurrentSelf')
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testSelfStopDrainsQueuedMessages() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorContext
+            import java.util.concurrent.CopyOnWriteArrayList
+            import java.util.concurrent.CountDownLatch
+
+            def log = new CopyOnWriteArrayList<Integer>()
+            // Gate the handler until all three sends are queued, otherwise
+            // the self-stop from msg 2 can flip active=false before
+            // sendAndGet(3) runs, causing an IllegalStateException.
+            def gate = new CountDownLatch(1)
+            def actor = Actor.stateful(0) { ActorContext ctx, int seen, 
Integer msg ->
+                gate.await()
+                log << msg
+                if (msg == 2) ctx.self().stop()
+                seen + 1
+            }
+            def r1 = actor.sendAndGet(1)
+            def r2 = actor.sendAndGet(2)
+            def r3 = actor.sendAndGet(3)
+            gate.countDown()
+            def v1 = await(r1)
+            def v2 = await(r2)
+            def v3 = await(r3)
+            for (int i = 0; i < 20 && actor.isActive(); i++) Thread.sleep(25)
+            assert log == [1, 2, 3]
+            assert !actor.isActive()
+        '''
+    }
+
+    @Test
+    void testStatePreservedAcrossSelfStop() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorContext
+            import java.util.concurrent.CountDownLatch
+
+            def gate = new CountDownLatch(1)
+            def actor = Actor.stateful(100) { ActorContext ctx, int state, msg 
->
+                gate.await()
+                if (msg == 'stop') { ctx.self().stop(); return state }
+                state + 1
+            }
+            def r1 = actor.sendAndGet('inc')
+            def r2 = actor.sendAndGet('stop')
+            def r3 = actor.sendAndGet('inc')      // queued before stop drains
+            gate.countDown()
+            def v1 = await(r1)
+            def v2 = await(r2)
+            def v3 = await(r3)
+            assert v1 == 101
+            // Reply for the stop-trigger message is the handler's return 
value.
+            assert v2 == 101
+            // Third message saw preserved state, then incremented.
+            assert v3 == 102
+        '''
+    }
+
+    @Test
+    void testCurrentSelfOutsideHandlerThrows() {
+        shouldFail(IllegalStateException, '''
+            import groovy.concurrent.Actor
+            Actor.currentSelf()
+        ''')
+    }
+
+    // === onError callback (GROOVY-12033) ===
+
+    @Test
+    void testOnErrorFiresForFireAndForgetException() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.CountDownLatch
+            import java.util.concurrent.atomic.AtomicReference
+
+            def latch = new CountDownLatch(1)
+            def captured = new AtomicReference<List>()
+            def actor = Actor.reactor { msg -> throw new 
RuntimeException("bang: $msg") }
+            actor.onError { Throwable t, msg ->
+                captured.set([t.message, msg])
+                latch.countDown()
+            }
+            actor.send('payload')
+            assert latch.await(2, java.util.concurrent.TimeUnit.SECONDS)
+            assert captured.get() == ['bang: payload', 'payload']
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testOnErrorAlsoFiresForSendAndGet() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.atomic.AtomicInteger
+
+            def fired = new AtomicInteger()
+            def actor = Actor.reactor { throw new RuntimeException('boom') }
+            actor.onError { Throwable t, msg -> fired.incrementAndGet() }
+
+            try { await(actor.sendAndGet('x')); assert false } catch 
(RuntimeException expected) { }
+            for (int i = 0; i < 20 && fired.get() == 0; i++) Thread.sleep(25)
+            assert fired.get() == 1
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testOnErrorContextCanStopActor() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorContext
+
+            def actor = Actor.reactor { throw new RuntimeException('die') }
+            actor.onError { ActorContext ctx, Throwable t, msg -> 
ctx.self().stop() }
+            actor.send('trigger')
+            for (int i = 0; i < 20 && actor.isActive(); i++) Thread.sleep(25)
+            assert !actor.isActive()
+        '''
+    }
+
+    @Test
+    void testOnErrorHandlerExceptionIsSwallowed() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.atomic.AtomicInteger
+
+            def processed = new AtomicInteger()
+            def actor = Actor.reactor { msg ->
+                processed.incrementAndGet()
+                if (msg == 'fail') throw new RuntimeException('first')
+                msg
+            }
+            actor.onError { Throwable t, msg -> throw new 
RuntimeException('handler also failed') }
+
+            actor.send('fail')
+            // Subsequent messages should still be processed even though the
+            // error handler itself threw.
+            assert await(actor.sendAndGet('ok')) == 'ok'
+            assert processed.get() == 2
+            actor.stop()
+        '''
+    }
+
+    // === Bounded mailbox (GROOVY-12033) ===
+
+    @Test
+    void testBoundedMailboxFailOverflowThrows() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorOptions
+            import java.util.concurrent.CountDownLatch
+
+            def started = new CountDownLatch(1)
+            def hold = new CountDownLatch(1)
+            def options = ActorOptions.DEFAULTS.withBoundedMailbox(2, 
ActorOptions.Overflow.FAIL)
+            def actor = Actor.reactor({ msg -> started.countDown(); 
hold.await(); msg }, options)
+
+            actor.send('first')     // taken by the handler, which blocks on 
hold
+            started.await()         // 'first' is off the queue when this 
returns
+            actor.send('a')         // queued (1/2)
+            actor.send('b')         // queued (2/2)
+            try {
+                actor.send('c')     // overflow
+                assert false : 'expected IllegalStateException'
+            } catch (IllegalStateException e) {
+                assert e.message.contains('mailbox full')
+            }
+            hold.countDown()
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testBoundedMailboxDropNewest() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorOptions
+            import java.util.concurrent.CountDownLatch
+            import java.util.concurrent.CopyOnWriteArrayList
+
+            def started = new CountDownLatch(1)
+            def hold = new CountDownLatch(1)
+            def seen = new CopyOnWriteArrayList()
+            def options = ActorOptions.DEFAULTS.withBoundedMailbox(2, 
ActorOptions.Overflow.DROP_NEWEST)
+            def actor = Actor.reactor({ msg -> started.countDown(); 
hold.await(); seen << msg; msg }, options)
+
+            actor.send('first')     // currently being processed
+            started.await()
+            actor.send('a')         // queued (1/2)
+            actor.send('b')         // queued (2/2)
+            actor.send('c')         // dropped silently
+            actor.send('d')         // dropped silently
+            hold.countDown()
+            // Wait for the three accepted messages
+            for (int i = 0; i < 40 && seen.size() < 3; i++) Thread.sleep(25)
+            assert seen.toList() == ['first', 'a', 'b']
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testBoundedMailboxDropNewestReplyBindsError() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorOptions
+            import java.util.concurrent.CountDownLatch
+
+            def started = new CountDownLatch(1)
+            def hold = new CountDownLatch(1)
+            def options = ActorOptions.DEFAULTS.withBoundedMailbox(1, 
ActorOptions.Overflow.DROP_NEWEST)
+            def actor = Actor.reactor({ msg -> started.countDown(); 
hold.await(); msg }, options)
+
+            actor.send('busy')         // occupies the handler
+            started.await()
+            actor.send('queued')        // fills the 1-slot queue

Review Comment:
   `started.await()` without a timeout can hang the build if the actor doesn't 
begin processing (e.g. due to a regression). Use a timed await and assert 
success.



##########
src/test/groovy/groovy/concurrent/ActorTest.groovy:
##########
@@ -226,4 +226,381 @@ final class ActorTest {
             calc.stop()
         '''
     }
+
+    // === Self-stop from handler (GROOVY-12033) ===
+
+    @Test
+    void testStatefulSelfStopViaContext() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorContext
+
+            def max = 3
+            def bot = Actor.stateful(0) { ActorContext ctx, int count, msg ->
+                def next = count + 1
+                if (next >= max) ctx.self().stop()
+                next
+            }
+
+            bot.send('one')
+            bot.send('two')
+            bot.send('three')
+            for (int i = 0; i < 20 && bot.isActive(); i++) Thread.sleep(25)
+            assert !bot.isActive()
+        '''
+    }
+
+    @Test
+    void testReactorSelfStopViaCurrentSelf() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorOptions
+            import java.util.concurrent.atomic.AtomicInteger
+
+            def n = new AtomicInteger()
+            def options = ActorOptions.DEFAULTS.withCurrentSelf(true)
+            def actor = Actor.reactor({ msg ->
+                if (n.incrementAndGet() == 2) Actor.currentSelf().stop()
+                msg
+            }, options)
+
+            assert await(actor.sendAndGet('first')) == 'first'
+            assert await(actor.sendAndGet('second')) == 'second'
+            for (int i = 0; i < 20 && actor.isActive(); i++) Thread.sleep(25)
+            assert !actor.isActive()
+        '''
+    }
+
+    @Test
+    void testCurrentSelfThrowsWhenNotEnabled() {
+        // currentSelf() is opt-in: an actor built with the default options
+        // (no withCurrentSelf(true)) does not publish the thread-local,
+        // so the call throws even though we are inside a handler.
+        assertScript '''
+            import groovy.concurrent.Actor
+            import java.util.concurrent.atomic.AtomicReference
+
+            def captured = new AtomicReference<Throwable>()
+            def actor = Actor.reactor { msg ->
+                try { Actor.currentSelf().stop() }
+                catch (Throwable t) { captured.set(t) }
+                msg
+            }
+            assert await(actor.sendAndGet('x')) == 'x'
+            assert captured.get() instanceof IllegalStateException
+            assert captured.get().message.contains('withCurrentSelf')
+            actor.stop()
+        '''
+    }
+
+    @Test
+    void testSelfStopDrainsQueuedMessages() {
+        assertScript '''
+            import groovy.concurrent.Actor
+            import groovy.concurrent.ActorContext
+            import java.util.concurrent.CopyOnWriteArrayList
+            import java.util.concurrent.CountDownLatch
+
+            def log = new CopyOnWriteArrayList<Integer>()
+            // Gate the handler until all three sends are queued, otherwise
+            // the self-stop from msg 2 can flip active=false before
+            // sendAndGet(3) runs, causing an IllegalStateException.
+            def gate = new CountDownLatch(1)
+            def actor = Actor.stateful(0) { ActorContext ctx, int seen, 
Integer msg ->
+                gate.await()
+                log << msg
+                if (msg == 2) ctx.self().stop()

Review Comment:
   `gate.await()` is unbounded inside the actor handler. If `gate.countDown()` 
is never reached due to an earlier failure, the actor thread (and the test via 
`await(...)`) can hang indefinitely. Consider using a timed await and failing 
fast on timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to