Copilot commented on code in PR #2386:
URL: https://github.com/apache/groovy/pull/2386#discussion_r2868417254
##########
src/antlr/GroovyParser.g4:
##########
@@ -778,13 +780,17 @@ expression
// must come before postfixExpression to resolve the ambiguities between
casting and call on parentheses expression, e.g. (int)(1 / 2)
: castParExpression castOperandExpression
#castExprAlt
+ // async closure/lambda must come before postfixExpression to resolve the
ambiguities between async and method call, e.g. async { ... }
+ | ASYNC nls closureOrLambdaExpression
#asyncClosureExprAlt
+
// qualified names, array expressions, method invocation, post inc/dec
| postfixExpression
#postfixExprAlt
| switchExpression
#switchExprAlt
- // ~(BNOT)/!(LNOT) (level 1)
+ // ~(BNOT)/!(LNOT)/await (level 1)
| (BITNOT | NOT) nls expression
#unaryNotExprAlt
+ | AWAIT nls expression
#awaitExprAlt
Review Comment:
Because `identifier` includes `AWAIT` and `expression` tries
`postfixExpression` before `awaitExprAlt`, input like `await foo` is likely
parsed as a command-chain starting from an identifier named `await` (or
`await(foo)`), so `awaitExprAlt`/`visitAwaitExprAlt` won’t trigger. This breaks
the intended keyword-lowering to `AsyncSupport.await(...)` (especially outside
`@Async` contexts). Consider giving `awaitExprAlt` higher priority than
`postfixExprAlt` (similar to `asyncClosureExprAlt`) or adding a predicate-based
disambiguation so `await <expr>` reliably parses as the await operator.
##########
src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.groovy.runtime.async;
+
+import groovy.concurrent.AsyncStream;
+import groovy.concurrent.Awaitable;
+
+import java.util.concurrent.SynchronousQueue;
+
+/**
+ * A producer/consumer implementation of {@link AsyncStream} used by
+ * {@code async} methods that contain {@code yield return} statements.
+ * <p>
+ * The producer (method body) runs on a separate thread and calls
+ * {@link #yield(Object)} for each emitted element. The consumer
+ * calls {@link #moveNext()}/{@link #getCurrent()} — typically via
+ * a {@code for await} loop.
+ * <p>
+ * Uses a {@link SynchronousQueue} to provide natural back-pressure:
+ * the producer thread blocks at each {@code yield return} until the
+ * consumer has consumed the previous element (mirroring C#'s async
+ * iterator suspension semantics).
+ * <p>
+ * This class is an internal implementation detail and should not be referenced
+ * directly by user code.
+ *
+ * @param <T> the element type
+ * @since 6.0.0
+ */
+public class AsyncStreamGenerator<T> implements AsyncStream<T> {
+
+ private static final Object DONE = new Object();
+
+ private final SynchronousQueue<Object> queue = new SynchronousQueue<>();
+ private T current;
+
+ /**
+ * Produces the next element. Called from the generator body when
+ * a {@code yield return expr} statement is executed. Blocks until
+ * the consumer is ready.
+ */
+ public void yield(Object value) {
+ try {
+ queue.put(new Item(value));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new java.util.concurrent.CancellationException("Interrupted
during yield");
+ }
+ }
Review Comment:
`AsyncStreamGenerator` uses a `SynchronousQueue`, so the producer thread
blocks on every `yield(...)` until the consumer calls `moveNext()`. If the
consumer exits early (break/return/exception in `for await` body), the producer
can block forever, leaking a thread/virtual-thread and potentially exhausting
the fallback fixed thread pool on JDK < 21. Consider adding an explicit
cancellation/close signal (e.g., `AsyncStream` implements `AutoCloseable` with
a no-op default, generator overrides `close()` to unblock the producer) and
ensure the `for await` lowering closes the stream in a `finally` block.
##########
src/main/java/org/codehaus/groovy/transform/AsyncASTTransformation.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.codehaus.groovy.transform;
+
+import groovy.concurrent.AsyncStream;
+import groovy.concurrent.Awaitable;
+import groovy.transform.Async;
+import org.apache.groovy.runtime.async.AsyncSupport;
+import org.codehaus.groovy.ast.ASTNode;
+import org.codehaus.groovy.ast.AnnotatedNode;
+import org.codehaus.groovy.ast.AnnotationNode;
+import org.codehaus.groovy.ast.ClassCodeExpressionTransformer;
+import org.codehaus.groovy.ast.ClassCodeVisitorSupport;
+import org.codehaus.groovy.ast.ClassHelper;
+import org.codehaus.groovy.ast.ClassNode;
+import org.codehaus.groovy.ast.FieldNode;
+import org.codehaus.groovy.ast.GenericsType;
+import org.codehaus.groovy.ast.MethodNode;
+import org.codehaus.groovy.ast.Parameter;
+import org.codehaus.groovy.ast.VariableScope;
+import org.codehaus.groovy.ast.CodeVisitorSupport;
+import org.codehaus.groovy.ast.expr.ArgumentListExpression;
+import org.codehaus.groovy.ast.expr.ClassExpression;
+import org.codehaus.groovy.ast.expr.ClosureExpression;
+import org.codehaus.groovy.ast.expr.Expression;
+import org.codehaus.groovy.ast.expr.MethodCallExpression;
+import org.codehaus.groovy.ast.expr.StaticMethodCallExpression;
+import org.codehaus.groovy.ast.expr.VariableExpression;
+import org.codehaus.groovy.ast.stmt.ExpressionStatement;
+import org.codehaus.groovy.ast.stmt.Statement;
+import org.codehaus.groovy.control.CompilePhase;
+import org.codehaus.groovy.control.SourceUnit;
+
+import static org.codehaus.groovy.ast.tools.GeneralUtils.args;
+import static org.codehaus.groovy.ast.tools.GeneralUtils.block;
+import static org.codehaus.groovy.ast.tools.GeneralUtils.callX;
+import static org.codehaus.groovy.ast.tools.GeneralUtils.returnS;
+import static org.codehaus.groovy.ast.tools.GeneralUtils.varX;
+import static
org.codehaus.groovy.ast.tools.GenericsUtils.makeClassSafeWithGenerics;
+
+/**
+ * Handles code generation for the {@link Async @Async} annotation.
+ * <p>
+ * Transforms the annotated method so that:
+ * <ol>
+ * <li>{@code await(expr)} calls within the method body are redirected to
+ * {@link AsyncSupport#await(Object) AsyncSupport.await()}</li>
+ * <li>The method body is executed asynchronously via
+ * {@link AsyncSupport#executeAsync AsyncSupport.executeAsync}
+ * (or {@link AsyncSupport#executeAsyncVoid
AsyncSupport.executeAsyncVoid}
+ * for {@code void} methods)</li>
+ * <li>Generator methods (containing {@code yield return}) are transformed to
+ * use {@link AsyncSupport#generateAsyncStream
AsyncSupport.generateAsyncStream},
+ * returning an {@link AsyncStream}{@code <T>}</li>
+ * <li>The return type becomes {@link Awaitable}{@code <T>}
+ * (or {@link AsyncStream}{@code <T>} for generators)</li>
+ * </ol>
+ * <p>
+ * This transformation runs during the {@link CompilePhase#CANONICALIZATION}
+ * phase — before type resolution, which allows the modified return types to
+ * participate in normal type checking.
+ *
+ * @see Async
+ * @see AsyncSupport
+ * @since 6.0.0
+ */
+@GroovyASTTransformation(phase = CompilePhase.CANONICALIZATION)
+public class AsyncASTTransformation extends AbstractASTTransformation {
+
+ private static final Class<?> MY_CLASS = Async.class;
+ private static final ClassNode MY_TYPE = ClassHelper.make(MY_CLASS);
+ private static final String MY_TYPE_NAME = "@" +
MY_TYPE.getNameWithoutPackage();
+ private static final ClassNode AWAITABLE_TYPE =
ClassHelper.make(Awaitable.class);
+ private static final ClassNode ASYNC_STREAM_TYPE =
ClassHelper.make(AsyncStream.class);
+ private static final ClassNode ASYNC_UTILS_TYPE =
ClassHelper.make(AsyncSupport.class);
+
+ @Override
+ public void visit(ASTNode[] nodes, SourceUnit source) {
+ init(nodes, source);
+ AnnotatedNode parent = (AnnotatedNode) nodes[1];
+ AnnotationNode anno = (AnnotationNode) nodes[0];
+ if (!MY_TYPE.equals(anno.getClassNode())) return;
+
+ if (!(parent instanceof MethodNode mNode)) return;
+
+ // Validate
+ if (mNode.isAbstract()) {
+ addError(MY_TYPE_NAME + " cannot be applied to abstract method '"
+ mNode.getName() + "'", mNode);
+ return;
+ }
+ if ("<init>".equals(mNode.getName()) ||
"<clinit>".equals(mNode.getName())) {
+ addError(MY_TYPE_NAME + " cannot be applied to constructors",
mNode);
+ return;
+ }
+ ClassNode originalReturnType = mNode.getReturnType();
+ if (AWAITABLE_TYPE.getName().equals(originalReturnType.getName())
+ ||
ASYNC_STREAM_TYPE.getName().equals(originalReturnType.getName())
+ ||
"java.util.concurrent.CompletableFuture".equals(originalReturnType.getName())) {
+ addError(MY_TYPE_NAME + " cannot be applied to a method that
already returns an async type", mNode);
+ return;
+ }
+ Statement originalBody = mNode.getCode();
+ if (originalBody == null) return;
+
+ ClassNode cNode = mNode.getDeclaringClass();
+
+ // Resolve executor expression
+ String executorFieldName = getMemberStringValue(anno, "executor");
+ Expression executorExpr;
+ if (executorFieldName != null && !executorFieldName.isEmpty()) {
+ FieldNode field = cNode.getDeclaredField(executorFieldName);
+ if (field == null) {
+ addError(MY_TYPE_NAME + ": executor field '" +
executorFieldName
+ + "' not found in class " + cNode.getName(), mNode);
+ return;
+ }
+ if (mNode.isStatic() && !field.isStatic()) {
+ addError(MY_TYPE_NAME + ": executor field '" +
executorFieldName
+ + "' must be static for static method '" +
mNode.getName() + "'", mNode);
+ return;
+ }
+ executorExpr = varX(executorFieldName);
Review Comment:
When resolving the custom executor field, `varX(executorFieldName)` creates
an unbound `VariableExpression` and can be accidentally captured by a
same-named local variable/parameter, resulting in using the wrong executor at
runtime. Since you already have the `FieldNode`, bind the expression to that
field explicitly (e.g., create a `VariableExpression` from the `FieldNode` /
set accessed variable) to ensure it always references the intended field.
```suggestion
VariableExpression executorVar = varX(executorFieldName);
executorVar.setAccessedVariable(field);
executorExpr = executorVar;
```
##########
src/test/groovy/org/codehaus/groovy/transform/AsyncVirtualThreadTest.groovy:
##########
@@ -0,0 +1,1115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.codehaus.groovy.transform
+
+import org.junit.jupiter.api.Test
+
+import static groovy.test.GroovyAssert.assertScript
+
+/**
+ * Tests for virtual thread integration, executor configuration, exception
+ * handling consistency across async methods/closures/lambdas, and coverage
+ * improvements for the async/await feature.
+ *
+ * @since 6.0.0
+ */
+final class AsyncVirtualThreadTest {
+
+ // ---- Virtual thread detection and usage ----
+
+ @Test
+ void testVirtualThreadsAvailable() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ // JDK 21+ should have virtual threads
+ def jdkVersion = Runtime.version().feature()
+ if (jdkVersion >= 21) {
+ assert isVirtualThreadsAvailable()
+ }
+ '''
+ }
+
+ @Test
+ void testAsyncMethodRunsOnVirtualThread() {
+ assertScript '''
+ import groovy.transform.Async
+ import static groovy.concurrent.AsyncUtils.*
+
+ class VTService {
+ async checkThread() {
+ return Thread.currentThread().isVirtual()
+ }
+ }
+
+ if (isVirtualThreadsAvailable()) {
+ def service = new VTService()
+ def result = service.checkThread().get()
+ assert result == true
+ }
+ '''
+ }
+
+ @Test
+ void testAsyncClosureRunsOnVirtualThread() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+
+ if (isVirtualThreadsAvailable()) {
+ def awaitable = async { Thread.currentThread().isVirtual() }
+ assert await(awaitable) == true
+ }
+ '''
+ }
+
+ @Test
+ void testAsyncLambdaRunsOnVirtualThread() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+
+ if (isVirtualThreadsAvailable()) {
+ def asyncFn = async { x -> Thread.currentThread().isVirtual() }
+ def result = await(asyncFn(42))
+ assert result == true
+ }
+ '''
+ }
+
+ @Test
+ void testForAwaitRunsOnVirtualThread() {
+ assertScript '''
+ import groovy.transform.Async
+ import static groovy.concurrent.AsyncUtils.*
+ import groovy.concurrent.AsyncStream
+
+ class VTGenerator {
+ async generate() {
+ yield return Thread.currentThread().isVirtual()
+ }
+ }
+
+ if (isVirtualThreadsAvailable()) {
+ def gen = new VTGenerator()
+ def stream = gen.generate()
+ def results = []
+ for await (item in stream) {
+ results << item
+ }
+ assert results == [true]
+ }
+ '''
+ }
+
+ @Test
+ void testHighConcurrencyWithVirtualThreads() {
+ assertScript '''
+ import groovy.transform.Async
+ import static groovy.concurrent.AsyncUtils.*
+
+ class HighConcurrency {
+ async compute(int n) {
+ Thread.sleep(10)
+ return n * 2
+ }
+ }
+
+ if (isVirtualThreadsAvailable()) {
+ def svc = new HighConcurrency()
+ // Launch 1000 concurrent tasks — trivial with virtual threads
+ def awaitables = (1..1000).collect { svc.compute(it) }
+ def results = awaitAll(awaitables as Object[])
+ assert results.size() == 1000
+ assert results[0] == 2
+ assert results[999] == 2000
+ }
+ '''
+ }
+
+ // ---- Executor configuration ----
+
+ @Test
+ void testCustomExecutorOverride() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import java.util.concurrent.Executors
+ import java.util.concurrent.atomic.AtomicReference
+
+ def savedExecutor = getExecutor()
+ try {
+ def customPool = Executors.newFixedThreadPool(2, { r ->
+ def t = new Thread(r)
+ t.setName("custom-async-" + t.getId())
+ t
+ })
+ setExecutor(customPool)
+
+ def awaitable = async {
+ Thread.currentThread().getName()
+ }
+ def threadName = await(awaitable)
+ assert threadName.startsWith("custom-async-")
+ } finally {
+ setExecutor(savedExecutor)
+ }
+ '''
+ }
+
+ @Test
+ void testSetExecutorNullResetsToDefault() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import java.util.concurrent.Executors
+
+ def originalExecutor = getExecutor()
+ // Set a custom executor
+ setExecutor(Executors.newSingleThreadExecutor())
+ assert getExecutor() != originalExecutor
+ // Reset to null — should restore default
+ setExecutor(null)
+ def restored = getExecutor()
+ assert restored != null
+ // Verify it works
+ def awaitable = groovy.concurrent.AsyncUtils.async { 42 }
+ assert groovy.concurrent.AsyncUtils.await(awaitable) == 42
+ // Restore original
+ setExecutor(originalExecutor)
+ '''
+ }
+
+ @Test
+ void testAsyncMethodWithCustomExecutorField() {
+ assertScript '''
+ import groovy.transform.Async
+ import static groovy.concurrent.AsyncUtils.*
+ import java.util.concurrent.Executor
+ import java.util.concurrent.Executors
+
+ class CustomExecutorService {
+ static Executor myPool = Executors.newFixedThreadPool(1, { r ->
+ def t = new Thread(r)
+ t.setName("my-pool-thread")
+ t
+ })
+
+ @Async(executor = "myPool")
+ def doWork() {
+ return Thread.currentThread().getName()
+ }
+ }
+
+ def svc = new CustomExecutorService()
+ def result = svc.doWork().get()
+ assert result.startsWith("my-pool-thread")
+ '''
+ }
+
+ // ---- Exception handling consistency: async method vs closure vs lambda
----
+
+ @Test
+ void testCheckedExceptionConsistencyAcrossAsyncMethods() {
+ assertScript '''
+ import groovy.transform.Async
+ import static groovy.concurrent.AsyncUtils.*
+
+ class ExcService {
+ async failWithChecked() {
+ throw new java.io.IOException("async method error")
+ }
+ }
+
+ def svc = new ExcService()
+ try {
+ await(svc.failWithChecked())
+ assert false : "Should have thrown"
+ } catch (java.io.IOException e) {
+ assert e.message == "async method error"
+ }
+ '''
+ }
+
+ @Test
+ void testCheckedExceptionConsistencyAcrossClosures() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+
+ def awaitable = async { throw new java.io.IOException("closure
error") }
+ try {
+ await(awaitable)
+ assert false : "Should have thrown"
+ } catch (java.io.IOException e) {
+ assert e.message == "closure error"
+ }
+ '''
+ }
+
+ @Test
+ void testCheckedExceptionConsistencyAcrossLambdas() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+
+ def asyncFn = async { x -> throw new java.io.IOException("lambda
error: ${x}") }
+ try {
+ await(asyncFn("test"))
+ assert false : "Should have thrown"
+ } catch (java.io.IOException e) {
+ assert e.message == "lambda error: test"
+ }
+ '''
+ }
+
+ @Test
+ void testRuntimeExceptionConsistencyAllForms() {
+ assertScript '''
+ import groovy.transform.Async
+ import static groovy.concurrent.AsyncUtils.*
+
+ // Form 1: async method
+ class Svc {
+ async failMethod() { throw new IllegalStateException("from
method") }
+ }
+
+ // Form 2: async closure
+ def closure = async { throw new IllegalArgumentException("from
closure") }
+
+ // Form 3: async lambda with params
+ def lambda = async { x -> throw new
UnsupportedOperationException("from lambda") }
+
+ // All should throw the exact exception type (no wrapping)
+ try { await(new Svc().failMethod()); assert false }
+ catch (IllegalStateException e) { assert e.message == "from
method" }
+
+ try { await(closure); assert false }
+ catch (IllegalArgumentException e) { assert e.message == "from
closure" }
+
+ try { await(lambda("x")); assert false }
+ catch (UnsupportedOperationException e) { assert e.message ==
"from lambda" }
+ '''
+ }
+
+ @Test
+ void testErrorPropagationConsistencyAllForms() {
+ assertScript '''
+ import groovy.transform.Async
+ import static groovy.concurrent.AsyncUtils.*
+
+ class ErrorSvc {
+ async fail() { throw new StackOverflowError("method") }
+ }
+
+ // async method
+ try { await(new ErrorSvc().fail()); assert false }
+ catch (StackOverflowError e) { assert e.message == "method" }
+
+ // async closure
+ try { await(async { throw new StackOverflowError("closure") });
assert false }
+ catch (StackOverflowError e) { assert e.message == "closure" }
+
+ // async lambda
+ def fn = async { x -> throw new StackOverflowError("lambda") }
+ try { await(fn(1)); assert false }
+ catch (StackOverflowError e) { assert e.message == "lambda" }
+ '''
+ }
+
+ // ---- Async stream (yield return) consistency ----
+
+ @Test
+ void testYieldReturnConsistencyMethodVsClosure() {
+ assertScript '''
+ import groovy.transform.Async
+ import static groovy.concurrent.AsyncUtils.*
+
+ // Form 1: async method with yield return
+ class GenSvc {
+ async range(int n) {
+ for (int i = 1; i <= n; i++) {
+ yield return i
+ }
+ }
+ }
+
+ // Form 2: async closure with yield return
+ def closureGen = async { for (int i = 1; i <= 3; i++) { yield
return i * 10 } }
+
+ // Form 3: async lambda with yield return + params
+ def lambdaGen = async { n -> for (int i = 1; i <= n; i++) { yield
return i * 100 } }
+
+ // Verify all produce correct streams
+ def methodResults = []
+ for await (item in new GenSvc().range(3)) { methodResults << item }
+ assert methodResults == [1, 2, 3]
+
+ def closureResults = []
+ for await (item in closureGen) { closureResults << item }
+ assert closureResults == [10, 20, 30]
+
+ def lambdaResults = []
+ for await (item in lambdaGen(3)) { lambdaResults << item }
+ assert lambdaResults == [100, 200, 300]
+ '''
+ }
+
+ @Test
+ void testYieldReturnExceptionConsistencyAllForms() {
+ assertScript '''
+ import groovy.transform.Async
+ import static groovy.concurrent.AsyncUtils.*
+
+ // async method generator with error
+ class FailGen {
+ async failing() {
+ yield return 1
+ throw new java.io.IOException("gen method error")
+ }
+ }
+
+ // async closure generator with error
+ def closureFailGen = async {
+ yield return 10
+ throw new java.io.IOException("gen closure error")
+ }
+
+ // async lambda generator with error
+ def lambdaFailGen = async { x ->
+ yield return x
+ throw new java.io.IOException("gen lambda error")
+ }
+
+ // All should propagate IOException through for-await
+ for (gen in [new FailGen().failing(), closureFailGen,
lambdaFailGen(100)]) {
+ def items = []
+ try {
+ for await (item in gen) { items << item }
+ assert false : "Should have thrown"
+ } catch (java.io.IOException e) {
+ assert e.message.contains("gen")
+ assert items.size() == 1
+ }
+ }
+ '''
+ }
+
+ // ---- executeAsync / executeAsyncVoid (unified path) ----
+
+ @Test
+ void testExecuteAsyncWithCustomExecutor() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import java.util.concurrent.Executors
+
+ def pool = Executors.newSingleThreadExecutor({ r ->
+ def t = new Thread(r)
+ t.setName("exec-async-test")
+ t
+ })
+
+ def awaitable = executeAsync({ ->
+ Thread.currentThread().getName()
+ }, pool)
+ def result = await(awaitable)
+ assert result.startsWith("exec-async-test")
+ pool.shutdown()
+ '''
+ }
+
+ @Test
+ void testExecuteAsyncVoidWithCustomExecutor() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import java.util.concurrent.Executors
+ import java.util.concurrent.atomic.AtomicBoolean
+
+ def pool = Executors.newSingleThreadExecutor()
+ def executed = new AtomicBoolean(false)
+
+ def awaitable = executeAsyncVoid({ ->
+ executed.set(true)
+ }, pool)
+ def _v1 = await(awaitable)
+ assert executed.get()
+ pool.shutdown()
+ '''
+ }
+
+ @Test
+ void testAsyncVoidMethodReturnsAwaitable() {
+ assertScript '''
+ import groovy.transform.Async
+ import static groovy.concurrent.AsyncUtils.*
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.atomic.AtomicInteger
+
+ class VoidService {
+ static AtomicInteger counter = new AtomicInteger(0)
+
+ @Async
+ void increment() {
+ counter.incrementAndGet()
+ }
+ }
+
+ def svc = new VoidService()
+ def awaitable = svc.increment()
+ assert awaitable instanceof Awaitable
+ def _v2 = await(awaitable)
+ assert VoidService.counter.get() == 1
+ '''
+ }
+
+ // ---- Edge cases and coverage improvements ----
+
+ @Test
+ void testAwaitNull() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ def result = await((Object) null)
+ assert result == null
+ '''
+ }
+
+ @Test
+ void testAwaitAlreadyCompletedAwaitable() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import groovy.concurrent.Awaitable
+
+ def awaitable = Awaitable.of(42)
+ assert awaitable.isDone()
+ def result = await(awaitable)
+ assert result == 42
+ '''
+ }
+
+ @Test
+ void testAwaitFailedAwaitable() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import groovy.concurrent.Awaitable
+
+ def awaitable = Awaitable.failed(new
RuntimeException("pre-failed"))
+ assert awaitable.isCompletedExceptionally()
+ try {
+ await(awaitable)
+ assert false
+ } catch (RuntimeException e) {
+ assert e.message == "pre-failed"
+ }
+ '''
+ }
+
+ @Test
+ void testAwaitCancelledAwaitable() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import org.apache.groovy.runtime.async.GroovyPromise
+ import java.util.concurrent.CompletableFuture
+ import java.util.concurrent.CancellationException
+
+ def cf = new CompletableFuture()
+ def awaitable = GroovyPromise.of(cf)
+ awaitable.cancel()
+ assert awaitable.isCancelled()
+ try {
+ await(awaitable)
+ assert false
+ } catch (CancellationException e) {
+ // expected
+ }
+ '''
+ }
+
+ @Test
+ void testAsyncWithReturnValue() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+
+ def awaitable = async { return "hello" }
+ assert await(awaitable) == "hello"
+ '''
+ }
+
+ @Test
+ void testAsyncWithNullReturnValue() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+
+ def awaitable = async { return null }
+ assert await(awaitable) == null
+ '''
+ }
+
+ @Test
+ void testMultipleConcurrentAsyncGenerators() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+
+ def gen1 = async { for (int i = 0; i < 5; i++) { yield return
"A${i}" } }
+ def gen2 = async { for (int i = 0; i < 5; i++) { yield return
"B${i}" } }
+
+ def results1 = []
+ def results2 = []
+ for await (item in gen1) { results1 << item }
+ for await (item in gen2) { results2 << item }
+ assert results1 == ["A0", "A1", "A2", "A3", "A4"]
+ assert results2 == ["B0", "B1", "B2", "B3", "B4"]
+ '''
+ }
+
+ @Test
+ void testEmptyAsyncStream() {
+ assertScript '''
+ import groovy.transform.Async
+ import static groovy.concurrent.AsyncUtils.*
+
+ class EmptyGen {
+ async empty() {
+ // A generator must have yield return to be detected as
one.
+ // This has one but it's unreachable at runtime.
+ if (false) yield return "unreachable"
+ }
+ }
+
+ def results = []
+ for await (item in new EmptyGen().empty()) {
+ results << item
+ }
+ assert results.isEmpty()
+ '''
+ }
+
+ @Test
+ void testAwaitableCompositionWithThen() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import groovy.concurrent.Awaitable
+
+ def original = async { 10 }
+ def doubled = original.then { it * 2 }
+ def result = await(doubled)
+ assert result == 20
+ '''
+ }
+
+ @Test
+ void testAwaitableCompositionWithThenCompose() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import groovy.concurrent.Awaitable
+
+ def first = async { 5 }
+ def chained = first.thenCompose { v -> async { v + 10 } }
+ def result = await(chained)
+ assert result == 15
+ '''
+ }
+
+ @Test
+ void testAwaitableExceptionally() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import groovy.concurrent.Awaitable
+
+ def failing = async { throw new RuntimeException("oops") }
+ def recovered = failing.exceptionally { e -> "recovered:
${e.message}" }
+ def result = await(recovered)
+ assert result == "recovered: oops"
+ '''
+ }
+
+ @Test
+ void testAwaitWithCompletionStage() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import java.util.concurrent.CompletableFuture
+ import java.util.concurrent.CompletionStage
+
+ CompletionStage stage = CompletableFuture.supplyAsync { "from
stage" }
+ def result = await(stage)
+ assert result == "from stage"
+ '''
+ }
+
+ @Test
+ void testAwaitWithFuture() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import java.util.concurrent.Executors
+ import java.util.concurrent.Future
+
+ def pool = Executors.newSingleThreadExecutor()
+ Future future = pool.submit({ "from future" } as
java.util.concurrent.Callable)
+ def result = await(future)
+ assert result == "from future"
+ pool.shutdown()
+ '''
+ }
+
+ @Test
+ void testAwaitAllWithMixedTypes() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.CompletableFuture
+
+ def a1 = async { 1 }
+ def a2 = CompletableFuture.supplyAsync { 2 }
+ def a3 = Awaitable.of(3)
+
+ def results = awaitAll(a1, a2, a3)
+ assert results == [1, 2, 3]
+ '''
+ }
+
+ @Test
+ void testAwaitAnyReturnsFirst() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import groovy.concurrent.Awaitable
+
+ def fast = Awaitable.of("fast")
+ def slow = async { Thread.sleep(500); "slow" }
+
+ def result = awaitAny(fast, slow)
+ assert result == "fast"
+ '''
+ }
+
+ @Test
+ void testAwaitAllSettledMixed() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+
+ def success = async { "ok" }
+ def failure = async { throw new RuntimeException("fail") }
+
+ def results = awaitAllSettled(success, failure)
+ assert results.size() == 2
+ assert results[0].isSuccess()
+ assert results[0].getValue() == "ok"
+ assert results[1].isFailure()
+ assert results[1].getError().message == "fail"
+ '''
+ }
+
+ @Test
+ void testAsyncMethodWithPrimitiveReturnType() {
+ assertScript '''
+ import groovy.transform.Async
+ import static groovy.concurrent.AsyncUtils.*
+
+ class PrimService {
+ @Async
+ int computeInt() { return 42 }
+
+ @Async
+ boolean checkBool() { return true }
+
+ @Async
+ double computeDouble() { return 3.14 }
+ }
+
+ def svc = new PrimService()
+ assert await(svc.computeInt()) == 42
+ assert await(svc.checkBool()) == true
+ assert Math.abs(await(svc.computeDouble()) - 3.14) < 0.001
+ '''
+ }
+
+ @Test
+ void testDeepUnwrapNestedExceptions() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import java.util.concurrent.CompletionException
+ import java.util.concurrent.ExecutionException
+
+ // Create deeply nested exception chain
+ def original = new java.io.IOException("deep")
+ def wrapped = new CompletionException(new ExecutionException(
+ new java.lang.reflect.UndeclaredThrowableException(
+ new
java.lang.reflect.InvocationTargetException(original))))
+
+ def cf = new java.util.concurrent.CompletableFuture()
+ cf.completeExceptionally(wrapped)
+
+ try {
+ await(cf)
+ assert false
+ } catch (java.io.IOException e) {
+ assert e.message == "deep"
+ }
+ '''
+ }
+
+ @Test
+ void testAsyncWithNestedAwait() {
+ assertScript '''
+ import groovy.transform.Async
+ import static groovy.concurrent.AsyncUtils.*
+
+ class NestedService {
+ async inner(int v) { return v * 2 }
+
+ async outer(int v) {
+ def intermediate = await inner(v)
+ return await inner(intermediate)
+ }
+ }
+
+ def svc = new NestedService()
+ def result = await(svc.outer(5))
+ assert result == 20 // 5 * 2 * 2
+ '''
+ }
+
+ @Test
+ void testAsyncClosureWithNestedAwait() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+
+ def inner = async { x -> x + 1 }
+ def outer = async {
+ def v1 = await(inner(10))
+ def v2 = await(inner(v1))
+ return v2
+ }
+ assert await(outer) == 12 // 10 + 1 + 1
+ '''
+ }
+
+ @Test
+ void testParallelAwaitInAsyncMethod() {
+ assertScript '''
+ import groovy.transform.Async
+ import static groovy.concurrent.AsyncUtils.*
+ import java.util.concurrent.CompletableFuture
+
+ class ParallelService {
+ CompletableFuture<Integer> compute(int v) {
+ return CompletableFuture.supplyAsync {
+ Thread.sleep(50)
+ return v * 2
+ }
+ }
+
+ async parallel() {
+ def f1 = compute(1)
+ def f2 = compute(2)
+ def f3 = compute(3)
+ return await(f1) + await(f2) + await(f3)
+ }
+ }
+
+ def svc = new ParallelService()
+ long start = System.currentTimeMillis()
+ def result = await(svc.parallel())
+ long elapsed = System.currentTimeMillis() - start
+ assert result == 12 // 2 + 4 + 6
+ assert elapsed < 500 // parallel, not sequential
+ '''
+ }
+
+ @Test
+ void testToAsyncStreamConversion() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import groovy.concurrent.AsyncStream
+ import groovy.concurrent.Awaitable
+ import groovy.concurrent.AwaitableAdapter
+ import groovy.concurrent.AwaitableAdapterRegistry
+
+ // Register an adapter for List that provides an AsyncStream
+ AwaitableAdapterRegistry.register(new AwaitableAdapter() {
+ boolean supportsAwaitable(Class type) { return false }
+ Awaitable toAwaitable(Object source) { return null }
+ boolean supportsAsyncStream(Class type) { return
List.isAssignableFrom(type) }
+ AsyncStream toAsyncStream(Object source) {
+ def list = (List) source
+ def iter = list.iterator()
+ return new AsyncStream() {
+ def current
+ Awaitable<Boolean> moveNext() {
+ if (iter.hasNext()) {
+ current = iter.next()
+ return Awaitable.of(true)
+ }
+ return Awaitable.of(false)
+ }
+ Object getCurrent() { return current }
+ }
+ }
+ })
+
+ def results = []
+ def stream = toAsyncStream([10, 20, 30])
+ for await (item in stream) {
+ results << item
+ }
+ assert results == [10, 20, 30]
+ '''
+ }
+
+ @Test
+ void testAwaitObjectDispatchesToCorrectOverload() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.CompletableFuture
+
+ // Test Object overload dispatch for each type
+ Object a = Awaitable.of("awaitable")
+ Object b = CompletableFuture.completedFuture("cf")
+ Object c = CompletableFuture.completedFuture("stage") as
java.util.concurrent.CompletionStage
+
+ assert await(a) == "awaitable"
+ assert await(b) == "cf"
+ assert await(c) == "stage"
+ '''
+ }
+
+ @Test
+ void testAsyncMethodWithParameters() {
+ assertScript '''
+ import groovy.transform.Async
+ import static groovy.concurrent.AsyncUtils.*
+
+ class ParamService {
+ async add(int a, int b) {
+ return a + b
+ }
+
+ async greet(String name) {
+ return "Hello, ${name}!"
+ }
+ }
+
+ def svc = new ParamService()
+ assert await(svc.add(3, 4)) == 7
+ assert await(svc.greet("Groovy")) == "Hello, Groovy!"
+ '''
+ }
+
+ @Test
+ void testAsyncChainedMethods() {
+ assertScript '''
+ import groovy.transform.Async
+ import static groovy.concurrent.AsyncUtils.*
+ import groovy.concurrent.Awaitable
+
+ class Pipeline {
+ async step1() { return 1 }
+
+ async step2(Awaitable<Integer> input) {
+ return await(input) + 10
+ }
+
+ async step3(Awaitable<Integer> input) {
+ return await(input) * 2
+ }
+ }
+
+ def p = new Pipeline()
+ def r1 = p.step1()
+ def r2 = p.step2(r1)
+ def r3 = p.step3(r2)
+ assert r3.get() == 22
+ '''
+ }
+
+ @Test
+ void testAwaitSyntaxWithBinaryOperations() {
+ assertScript '''
+ import groovy.transform.Async
+ import static groovy.concurrent.AsyncUtils.*
+ import groovy.concurrent.Awaitable
+
+ class MathService {
+ async compute(int v) { return v }
+ }
+
+ def svc = new MathService()
+ // Test that await properly integrates with binary operators
+ def a = svc.compute(10)
+ def b = svc.compute(20)
+ def r1 = await(a)
+ def r2 = await(b)
+ assert r1 + r2 == 30
+ '''
+ }
+
+ @Test
+ void testAsyncClosureWithoutYieldReturnIsNotGenerator() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+
+ // Closure with no yield return produces an Awaitable (not
AsyncStream)
+ def result = async { 42 }
+ assert await(result) == 42
+
+ def resultNull = async { /* empty body */ }
+ assert await(resultNull) == null
+ '''
+ }
+
+ @Test
+ void testGroovyPromiseToString() {
+ assertScript '''
+ import org.apache.groovy.runtime.async.GroovyPromise
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.CompletableFuture
+
+ def pending = new CompletableFuture()
+ def promise = GroovyPromise.of(pending)
+ assert promise.toString() == "GroovyPromise{pending}"
+
+ pending.complete(42)
+ assert promise.toString() == "GroovyPromise{completed}"
+
+ def failed = GroovyPromise.of(CompletableFuture.failedFuture(new
RuntimeException()))
+ assert failed.toString() == "GroovyPromise{failed}"
+ '''
+ }
+
+ @Test
+ void testGroovyPromiseGetWithTimeout() {
+ assertScript '''
+ import org.apache.groovy.runtime.async.GroovyPromise
+ import static groovy.concurrent.AsyncUtils.*
+ import java.util.concurrent.TimeUnit
+ import java.util.concurrent.TimeoutException
+
+ def awaitable = async { Thread.sleep(5000); "never" }
+ try {
+ awaitable.get(50, TimeUnit.MILLISECONDS)
+ assert false : "Should have timed out"
+ } catch (TimeoutException e) {
+ // expected
+ }
+ '''
Review Comment:
The async task here sleeps for 5 seconds and is never cancelled/cleaned up
after the timeout assertion. That leaves background work running and can
consume threads (especially on the JDK<21 fixed pool) and slow/flakify the test
suite. Prefer a much shorter sleep (still > timeout) and/or cancel the
awaitable in a `finally` block once the timeout is observed.
##########
src/test/groovy/org/codehaus/groovy/transform/AsyncTransformTest.groovy:
##########
@@ -0,0 +1,1764 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.codehaus.groovy.transform
+
+
+import org.codehaus.groovy.control.CompilationFailedException
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Test
+
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.Executors
+
+import static groovy.test.GroovyAssert.assertScript
+import static groovy.test.GroovyAssert.shouldFail
+import static groovy.concurrent.AsyncUtils.async
+import static groovy.concurrent.AsyncUtils.await
+import static groovy.concurrent.AsyncUtils.awaitAll
+import static groovy.concurrent.AsyncUtils.awaitAny
+import static groovy.concurrent.AsyncUtils.setExecutor
+
+/**
+ * Tests for async/await: @Async annotation, async/await keywords,
+ * Awaitable abstraction, for-await, and AsyncUtils.
+ */
+class AsyncTransformTest {
+
+ @AfterEach
+ void resetExecutor() {
+ setExecutor(null)
+ }
+
+ // ==== @Async annotation tests ====
+
+ @Test
+ void testBasicAsyncReturnsAwaitable() {
+ assertScript '''
+ import groovy.transform.Async
+ import groovy.concurrent.Awaitable
+
+ class Service {
+ @Async
+ def fetchData() {
+ return "hello"
+ }
+ }
+
+ def service = new Service()
+ def future = service.fetchData()
+ assert future instanceof Awaitable
+ assert future.get() == "hello"
+ '''
+ }
+
+ @Test
+ void testAsyncWithTypedReturnValue() {
+ assertScript '''
+ import groovy.transform.Async
+ import groovy.concurrent.Awaitable
+
+ class Service {
+ @Async
+ String compute() {
+ return "typed-result"
+ }
+ }
+
+ def service = new Service()
+ Awaitable<String> future = service.compute()
+ assert future.get() == "typed-result"
+ '''
+ }
+
+ @Test
+ void testAsyncWithIntReturnValue() {
+ assertScript '''
+ import groovy.transform.Async
+ import groovy.concurrent.Awaitable
+
+ class Calculator {
+ @Async
+ int add(int a, int b) {
+ return a + b
+ }
+ }
+
+ def calc = new Calculator()
+ def future = calc.add(3, 4)
+ assert future instanceof Awaitable
+ assert future.get() == 7
+ '''
+ }
+
+ @Test
+ void testAsyncVoidMethod() {
+ assertScript '''
+ import groovy.transform.Async
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.atomic.AtomicBoolean
+
+ class Worker {
+ AtomicBoolean executed = new AtomicBoolean(false)
+
+ @Async
+ void doWork() {
+ executed.set(true)
+ }
+ }
+
+ def worker = new Worker()
+ def future = worker.doWork()
+ assert future instanceof Awaitable
+ future.get()
+ assert worker.executed.get()
+ '''
+ }
+
+ @Test
+ void testAsyncStaticMethod() {
+ assertScript '''
+ import groovy.transform.Async
+ import groovy.concurrent.Awaitable
+
+ class Util {
+ @Async
+ static String process(String input) {
+ return input.toUpperCase()
+ }
+ }
+
+ def future = Util.process("hello")
+ assert future instanceof Awaitable
+ assert future.get() == "HELLO"
+ '''
+ }
+
+ // ==== await() call-style tests (backward compat) ====
+
+ @Test
+ void testAwaitCallInAsyncMethod() {
+ assertScript '''
+ import groovy.transform.Async
+ import java.util.concurrent.CompletableFuture
+
+ class Service {
+ @Async
+ def fetchAndProcess() {
+ def value = await(CompletableFuture.supplyAsync { 42 })
+ return value * 2
+ }
+ }
+
+ def service = new Service()
+ assert service.fetchAndProcess().get() == 84
+ '''
+ }
+
+ @Test
+ void testMultipleSequentialAwaits() {
+ assertScript '''
+ import groovy.transform.Async
+ import java.util.concurrent.CompletableFuture
+
+ class Service {
+ @Async
+ def combineResults() {
+ def a = await(CompletableFuture.supplyAsync { 10 })
+ def b = await(CompletableFuture.supplyAsync { 20 })
+ def c = await(CompletableFuture.supplyAsync { 30 })
+ return a + b + c
+ }
+ }
+
+ def service = new Service()
+ assert service.combineResults().get() == 60
+ '''
+ }
+
+ @Test
+ void testAwaitWithDependentFutures() {
+ assertScript '''
+ import groovy.transform.Async
+ import java.util.concurrent.CompletableFuture
+
+ class Service {
+ @Async
+ def pipeline() {
+ def raw = await(CompletableFuture.supplyAsync { "hello
world" })
+ def upper = await(CompletableFuture.supplyAsync {
raw.toUpperCase() })
+ return upper
+ }
+ }
+
+ def service = new Service()
+ assert service.pipeline().get() == "HELLO WORLD"
+ '''
+ }
+
+ @Test
+ void testNestedAwaitInExpression() {
+ assertScript '''
+ import groovy.transform.Async
+ import java.util.concurrent.CompletableFuture
+
+ class Service {
+ @Async
+ def nestedExpr() {
+ return await(CompletableFuture.supplyAsync { 5 }) +
await(CompletableFuture.supplyAsync { 3 })
+ }
+ }
+
+ def service = new Service()
+ assert service.nestedExpr().get() == 8
+ '''
+ }
+
+ @Test
+ void testAwaitInsideConditional() {
+ assertScript '''
+ import groovy.transform.Async
+ import java.util.concurrent.CompletableFuture
+
+ class Service {
+ @Async
+ def conditional(boolean flag) {
+ if (flag) {
+ return await(CompletableFuture.supplyAsync { "yes" })
+ } else {
+ return await(CompletableFuture.supplyAsync { "no" })
+ }
+ }
+ }
+
+ def service = new Service()
+ assert service.conditional(true).get() == "yes"
+ assert service.conditional(false).get() == "no"
+ '''
+ }
+
+ @Test
+ void testAwaitInsideLoop() {
+ assertScript '''
+ import groovy.transform.Async
+ import java.util.concurrent.CompletableFuture
+
+ class Service {
+ @Async
+ def sumAsync(int count) {
+ int sum = 0
+ for (int i = 1; i <= count; i++) {
+ sum += await(CompletableFuture.supplyAsync { i })
+ }
+ return sum
+ }
+ }
+
+ def service = new Service()
+ assert service.sumAsync(5).get() == 15
+ '''
+ }
+
+ @Test
+ void testAwaitInsideTryCatch() {
+ assertScript '''
+ import groovy.transform.Async
+ import java.util.concurrent.CompletableFuture
+
+ class Service {
+ @Async
+ def safeOperation() {
+ try {
+ def result = await(CompletableFuture.supplyAsync { 42
})
+ return result
+ } catch (Exception e) {
+ return -1
+ }
+ }
+ }
+
+ def service = new Service()
+ assert service.safeOperation().get() == 42
+ '''
+ }
+
+ @Test
+ void testAwaitInsideClosureInAsyncMethod() {
+ assertScript '''
+ import groovy.transform.Async
+ import java.util.concurrent.CompletableFuture
+
+ class Service {
+ @Async
+ def collectResults() {
+ def futures = [
+ CompletableFuture.supplyAsync { 1 },
+ CompletableFuture.supplyAsync { 2 },
+ CompletableFuture.supplyAsync { 3 }
+ ]
+ return futures.collect { f -> await(f) }
+ }
+ }
+
+ def service = new Service()
+ assert service.collectResults().get() == [1, 2, 3]
+ '''
+ }
+
+ // ==== Exception handling ====
+
+ @Test
+ void testAsyncExceptionPropagation() {
+ assertScript '''
+ import groovy.transform.Async
+ import java.util.concurrent.ExecutionException
+
+ class Service {
+ @Async
+ def failingMethod() {
+ throw new IllegalStateException("async failure")
+ }
+ }
+
+ def service = new Service()
+ def future = service.failingMethod()
+ try {
+ future.get()
+ assert false : "should have thrown"
+ } catch (ExecutionException e) {
+ assert e.cause instanceof IllegalStateException
+ assert e.cause.message == "async failure"
+ }
+ '''
+ }
+
+ @Test
+ void testAwaitExceptionFromFuture() {
+ assertScript '''
+ import groovy.transform.Async
+ import java.util.concurrent.CompletableFuture
+ import java.util.concurrent.ExecutionException
+
+ class Service {
+ @Async
+ def awaitFailing() {
+ return await(CompletableFuture.supplyAsync {
+ throw new IllegalArgumentException("inner failure")
+ })
+ }
+ }
+
+ def service = new Service()
+ try {
+ service.awaitFailing().get()
+ assert false : "should have thrown"
+ } catch (ExecutionException e) {
+ assert e.cause instanceof IllegalArgumentException
+ assert e.cause.message == "inner failure"
+ }
+ '''
+ }
+
+ @Test
+ void testAwaitExceptionCaughtInTryCatch() {
+ assertScript '''
+ import groovy.transform.Async
+ import java.util.concurrent.CompletableFuture
+
+ class Service {
+ @Async
+ def recoverFromError() {
+ try {
+ return await(CompletableFuture.supplyAsync {
+ throw new IllegalArgumentException("oops")
+ })
+ } catch (IllegalArgumentException e) {
+ return "recovered: " + e.message
+ }
+ }
+ }
+
+ def service = new Service()
+ assert service.recoverFromError().get() == "recovered: oops"
+ '''
+ }
+
+ // ==== Custom executor ====
+
+ @Test
+ void testAsyncWithCustomExecutor() {
+ assertScript '''
+ import groovy.transform.Async
+ import java.util.concurrent.Executors
+
+ class Service {
+ def myPool = Executors.newSingleThreadExecutor()
+
+ @Async(executor = "myPool")
+ def runOnCustomPool() {
+ return Thread.currentThread().name
+ }
+ }
+
+ def service = new Service()
+ def threadName = service.runOnCustomPool().get()
+ assert threadName.contains("pool")
+ service.myPool.shutdown()
+ '''
+ }
+
+ @Test
+ void testAsyncStaticMethodWithStaticExecutor() {
+ assertScript '''
+ import groovy.transform.Async
+ import java.util.concurrent.Executors
+
+ class Service {
+ static def pool = Executors.newSingleThreadExecutor()
+
+ @Async(executor = "pool")
+ static def runStatic() {
+ return "static-result"
+ }
+ }
+
+ assert Service.runStatic().get() == "static-result"
+ Service.pool.shutdown()
+ '''
+ }
+
+ // ==== Concurrency ====
+
+ @Test
+ void testAsyncMethodRunsOnDifferentThread() {
+ assertScript '''
+ import groovy.transform.Async
+
+ class Service {
+ @Async
+ def getThreadName() {
+ return Thread.currentThread().name
+ }
+ }
+
+ def service = new Service()
+ def callerThread = Thread.currentThread().name
+ def asyncThread = service.getThreadName().get()
+ assert asyncThread != callerThread ||
asyncThread.contains("ForkJoinPool")
+ '''
+ }
+
+ @Test
+ void testConcurrentAsyncExecution() {
+ assertScript '''
+ import groovy.transform.Async
+ import java.util.concurrent.CountDownLatch
+ import java.util.concurrent.atomic.AtomicInteger
+
+ class Service {
+ AtomicInteger concurrentCount = new AtomicInteger(0)
+ AtomicInteger maxConcurrent = new AtomicInteger(0)
+
+ @Async
+ def concurrentTask(CountDownLatch startLatch) {
+ int current = concurrentCount.incrementAndGet()
+ maxConcurrent.updateAndGet { max -> Math.max(max, current)
}
+ startLatch.await(5, java.util.concurrent.TimeUnit.SECONDS)
+ concurrentCount.decrementAndGet()
+ return current
+ }
+ }
+
+ def service = new Service()
+ def latch = new CountDownLatch(1)
+ def futures = (1..10).collect { service.concurrentTask(latch) }
+ Thread.sleep(500)
+ latch.countDown()
+ futures.each { it.get(5, java.util.concurrent.TimeUnit.SECONDS) }
+ assert service.maxConcurrent.get() > 1
+ '''
+ }
+
+ @Test
+ void testHighConcurrencyStressTest() {
+ assertScript '''
+ import groovy.transform.Async
+ import java.util.concurrent.atomic.AtomicInteger
+
+ class Service {
+ AtomicInteger counter = new AtomicInteger(0)
+
+ @Async
+ def increment() {
+ counter.incrementAndGet()
+ return counter.get()
+ }
+ }
+
+ def service = new Service()
+ int taskCount = 1000
+ def futures = (1..taskCount).collect { service.increment() }
+ futures.each { it.get(10, java.util.concurrent.TimeUnit.SECONDS) }
+ assert service.counter.get() == taskCount
+ '''
+ }
+
+ @Test
+ void testParallelAwaitPattern() {
+ assertScript '''
+ import groovy.transform.Async
+ import java.util.concurrent.CompletableFuture
+
+ class Service {
+ CompletableFuture<Integer> asyncCompute(int value) {
+ return CompletableFuture.supplyAsync {
+ Thread.sleep(50)
+ return value * 2
+ }
+ }
+
+ @Async
+ def parallelFetch() {
+ def f1 = asyncCompute(1)
+ def f2 = asyncCompute(2)
+ def f3 = asyncCompute(3)
+ return await(f1) + await(f2) + await(f3)
+ }
+ }
+
+ def service = new Service()
+ long start = System.currentTimeMillis()
+ def result = service.parallelFetch().get()
+ long elapsed = System.currentTimeMillis() - start
+ assert result == 12
+ assert elapsed < 500
+ '''
+ }
+
+ // ==== AsyncUtils standalone ====
+
+ @Test
+ void testAsyncUtilsAsyncClosure() {
+ def future = async { 42 }
+ assert future instanceof groovy.concurrent.Awaitable
+ assert future.get() == 42
+ }
+
+ @Test
+ void testAsyncUtilsAwait() {
+ def future = CompletableFuture.supplyAsync { "hello" }
+ def result = await(future)
+ assert result == "hello"
+ }
+
+ @Test
+ void testAsyncUtilsAwaitAll() {
+ def f1 = CompletableFuture.supplyAsync { 1 }
+ def f2 = CompletableFuture.supplyAsync { 2 }
+ def f3 = CompletableFuture.supplyAsync { 3 }
+ def results = awaitAll(f1, f2, f3)
+ assert results == [1, 2, 3]
+ }
+
+ @Test
+ void testAsyncUtilsAwaitAny() {
+ def f1 = CompletableFuture.supplyAsync { "first" }
+ def f2 = new CompletableFuture()
+ def f3 = new CompletableFuture()
+ def result = awaitAny(f1, f2, f3)
+ assert result == "first"
+ }
+
+ @Test
+ void testAsyncUtilsSetCustomExecutor() {
+ def pool = Executors.newSingleThreadExecutor()
+ try {
+ setExecutor(pool)
+ def future = async { Thread.currentThread().name }
+ assert future.get().contains("pool")
+ } finally {
+ setExecutor(null)
+ pool.shutdown()
+ }
+ }
+
+ @Test
+ void testAsyncUtilsAwaitFutureInterface() {
+ def pool = Executors.newSingleThreadExecutor()
+ try {
+ java.util.concurrent.Future<String> future = pool.submit({
"from-executor" } as java.util.concurrent.Callable)
+ def result = await(future)
+ assert result == "from-executor"
+ } finally {
+ pool.shutdown()
+ }
+ }
+
+ @Test
+ void testAsyncUtilsAwaitExceptionUnwrapping() {
+ def future = CompletableFuture.supplyAsync {
+ throw new IllegalStateException("test error")
+ }
+ try {
+ await(future)
+ assert false : "should have thrown"
+ } catch (IllegalStateException e) {
+ assert e.message == "test error"
+ }
+ }
+
+ // ==== Compilation errors ====
+
+ @Test
+ void testAsyncOnAbstractMethodFails() {
+ def err = shouldFail CompilationFailedException, '''
+ import groovy.transform.Async
+
+ abstract class Service {
+ @Async
+ abstract def fetchData()
+ }
+ '''
+ assert err.message.contains("cannot be applied to abstract method")
+ }
+
+ @Test
+ void testAsyncOnAwaitableReturnTypeFails() {
+ def err = shouldFail CompilationFailedException, '''
+ import groovy.transform.Async
+ import groovy.concurrent.Awaitable
+
+ class Service {
+ @Async
+ Awaitable<String> fetchData() {
+ return groovy.concurrent.Awaitable.of("hello")
+ }
+ }
+ '''
+ assert err.message.contains("already returns an async type")
+ }
+
+ @Test
+ void testAsyncWithMissingExecutorFieldFails() {
+ def err = shouldFail CompilationFailedException, '''
+ import groovy.transform.Async
+
+ class Service {
+ @Async(executor = "nonExistentField")
+ def fetchData() {
+ return "hello"
+ }
+ }
+ '''
+ assert err.message.contains("executor field 'nonExistentField' not
found")
+ }
+
+ @Test
+ void testAsyncStaticMethodWithInstanceExecutorFails() {
+ def err = shouldFail CompilationFailedException, '''
+ import groovy.transform.Async
+ import java.util.concurrent.Executors
+
+ class Service {
+ def myPool = Executors.newSingleThreadExecutor()
+
+ @Async(executor = "myPool")
+ static def fetchData() {
+ return "hello"
+ }
+ }
+ '''
+ assert err.message.contains("must be static for static method")
+ }
+
+ // ==== Static import async/await ====
+
+ @Test
+ void testAsyncClosureWithStaticImport() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import java.util.concurrent.CompletableFuture
+
+ def future = async {
+ def a = await(CompletableFuture.supplyAsync { 10 })
+ def b = await(CompletableFuture.supplyAsync { 20 })
+ return a + b
+ }
+ assert future.get() == 30
+ '''
+ }
+
+ @Test
+ void testAsyncClosureExceptionHandling() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import java.util.concurrent.ExecutionException
+
+ def future = async {
+ throw new IllegalStateException("closure error")
+ }
+ try {
+ future.get()
+ assert false : "should have thrown"
+ } catch (ExecutionException e) {
+ assert e.cause instanceof IllegalStateException
+ assert e.cause.message == "closure error"
+ }
+ '''
+ }
+
+ // ==== Integration scenarios ====
+
+ @Test
+ void testAsyncProducerConsumerPattern() {
+ assertScript '''
+ import groovy.transform.Async
+ import groovy.concurrent.Awaitable
+
+ class Producer {
+ @Async
+ def produce(int id) {
+ Thread.sleep(10)
+ return "item-${id}"
+ }
+ }
+
+ class Consumer {
+ @Async
+ def consume(Awaitable<String> itemFuture) {
+ def item = await(itemFuture)
+ return "consumed: ${item}"
+ }
+ }
+
+ def producer = new Producer()
+ def consumer = new Consumer()
+ def item = producer.produce(1)
+ def result = consumer.consume(item)
+ assert result.get() == "consumed: item-1"
+ '''
+ }
+
+ @Test
+ void testAsyncChainedCalls() {
+ assertScript '''
+ import groovy.transform.Async
+ import groovy.concurrent.Awaitable
+
+ class Pipeline {
+ @Async
+ def step1() { return 1 }
+
+ @Async
+ def step2(Awaitable<Integer> input) {
+ return await(input) + 10
+ }
+
+ @Async
+ def step3(Awaitable<Integer> input) {
+ return await(input) * 2
+ }
+ }
+
+ def p = new Pipeline()
+ def r1 = p.step1()
+ def r2 = p.step2(r1)
+ def r3 = p.step3(r2)
+ assert r3.get() == 22
+ '''
+ }
+
+ @Test
+ void testAsyncWithMethodParameters() {
+ assertScript '''
+ import groovy.transform.Async
+ import java.util.concurrent.CompletableFuture
+
+ class Service {
+ @Async
+ def greet(String name, int times) {
+ def parts = []
+ for (int i = 0; i < times; i++) {
+ parts << await(CompletableFuture.supplyAsync { "Hello
${name}" })
+ }
+ return parts.join(", ")
+ }
+ }
+
+ def service = new Service()
+ assert service.greet("World", 3).get() == "Hello World, Hello
World, Hello World"
+ '''
+ }
+
+ @Test
+ void testAsyncWithCollectionProcessing() {
+ assertScript '''
+ import groovy.transform.Async
+ import java.util.concurrent.CompletableFuture
+
+ class BatchProcessor {
+ @Async
+ def processBatch(List<Integer> items) {
+ def futures = items.collect { item ->
+ CompletableFuture.supplyAsync { item * item }
+ }
+ return futures.collect { f -> await(f) }
+ }
+ }
+
+ def processor = new BatchProcessor()
+ assert processor.processBatch([1, 2, 3, 4, 5]).get() == [1, 4, 9,
16, 25]
+ '''
+ }
+
+ @Test
+ void testAsyncAwaitAllPattern() {
+ assertScript '''
+ import groovy.transform.Async
+ import static groovy.concurrent.AsyncUtils.*
+ import java.util.concurrent.CompletableFuture
+
+ class Service {
+ @Async
+ def fetchAll() {
+ def f1 = CompletableFuture.supplyAsync { "a" }
+ def f2 = CompletableFuture.supplyAsync { "b" }
+ def f3 = CompletableFuture.supplyAsync { "c" }
+ return awaitAll(f1, f2, f3)
+ }
+ }
+
+ def service = new Service()
+ assert service.fetchAll().get() == ["a", "b", "c"]
+ '''
+ }
+
+ @Test
+ void testAsyncInScript() {
+ assertScript '''
+ import groovy.transform.Async
+ import groovy.concurrent.Awaitable
+
+ @Async
+ def asyncAdd(int a, int b) {
+ return a + b
+ }
+
+ def future = asyncAdd(3, 4)
+ assert future instanceof Awaitable
+ assert future.get() == 7
+ '''
+ }
+
+ // ==== 'async' keyword modifier ====
+
+ @Test
+ void testAsyncKeywordModifier() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+
+ class Service {
+ async def fetchData() {
+ return "from-async-keyword"
+ }
+ }
+
+ def service = new Service()
+ def result = service.fetchData()
+ assert result instanceof Awaitable
+ assert result.get() == "from-async-keyword"
+ '''
+ }
+
+ @Test
+ void testAsyncKeywordWithAwaitKeyword() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.CompletableFuture
+
+ class Service {
+ async def compute() {
+ def a = await CompletableFuture.supplyAsync { 10 }
+ def b = await CompletableFuture.supplyAsync { 20 }
+ return a + b
+ }
+ }
+
+ def service = new Service()
+ assert service.compute().get() == 30
+ '''
+ }
+
+ @Test
+ void testAsyncKeywordStaticMethod() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+
+ class Util {
+ async static process(String input) {
+ return input.toUpperCase()
+ }
+ }
+
+ def result = Util.process("hello")
+ assert result instanceof Awaitable
+ assert result.get() == "HELLO"
+ '''
+ }
+
+ @Test
+ void testAsyncKeywordVoidMethod() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.atomic.AtomicBoolean
+
+ class Worker {
+ AtomicBoolean done = new AtomicBoolean(false)
+
+ async void doWork() {
+ done.set(true)
+ }
+ }
+
+ def worker = new Worker()
+ def result = worker.doWork()
+ assert result instanceof Awaitable
+ result.get()
+ assert worker.done.get()
+ '''
+ }
+
+ @Test
+ void testAsyncKeywordInScript() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+
+ async def add(int a, int b) {
+ return a + b
+ }
+
+ def result = add(5, 7)
+ assert result instanceof Awaitable
+ assert result.get() == 12
+ '''
+ }
+
+ // ==== 'await' keyword expression ====
+
+ @Test
+ void testAwaitKeywordExpression() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.CompletableFuture
+
+ class Service {
+ async def fetchAndDouble() {
+ def value = await CompletableFuture.supplyAsync { 21 }
+ return value * 2
+ }
+ }
+
+ def service = new Service()
+ assert service.fetchAndDouble().get() == 42
+ '''
+ }
+
+ @Test
+ void testAwaitKeywordPrecedence() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.CompletableFuture
+
+ class Service {
+ async def addAwaits() {
+ // 'await' should bind to the immediate expression, not to
the sum
+ // await a + await b => (await a) + (await b)
+ return (await CompletableFuture.supplyAsync { 10 }) +
(await CompletableFuture.supplyAsync { 5 })
+ }
+ }
+
+ def service = new Service()
+ assert service.addAwaits().get() == 15
+ '''
+ }
+
+ @Test
+ void testAwaitKeywordWithParentheses() {
+ assertScript '''
+ import java.util.concurrent.CompletableFuture
+
+ class Service {
+ @groovy.transform.Async
+ def compute() {
+ // await(expr) still works as await keyword +
parenthesized expression
+ def a = await(CompletableFuture.supplyAsync { 100 })
+ return a
+ }
+ }
+
+ def service = new Service()
+ assert service.compute().get() == 100
+ '''
+ }
+
+ @Test
+ void testAwaitKeywordWithMethodCall() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.CompletableFuture
+
+ class Service {
+ CompletableFuture<String> fetchRemote() {
+ CompletableFuture.supplyAsync { "remote-data" }
+ }
+
+ async def process() {
+ def data = await fetchRemote()
+ return "processed: ${data}"
+ }
+ }
+
+ def service = new Service()
+ assert service.process().get() == "processed: remote-data"
+ '''
+ }
+
+ @Test
+ void testAwaitKeywordInConditional() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.CompletableFuture
+
+ class Service {
+ async def check(boolean flag) {
+ if (flag) {
+ return await CompletableFuture.supplyAsync { "yes" }
+ } else {
+ return await CompletableFuture.supplyAsync { "no" }
+ }
+ }
+ }
+
+ def service = new Service()
+ assert service.check(true).get() == "yes"
+ assert service.check(false).get() == "no"
+ '''
+ }
+
+ // ==== Awaitable abstraction tests ====
+
+ @Test
+ void testAwaitableInterface() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+ import org.apache.groovy.runtime.async.GroovyPromise
+ import java.util.concurrent.CompletableFuture
+
+ // Test Awaitable.of()
+ def a = Awaitable.of("hello")
+ assert a.get() == "hello"
+ assert a.isDone()
+
+ // Test Awaitable.failed()
+ def f = Awaitable.failed(new RuntimeException("oops"))
+ try {
+ f.get()
+ assert false
+ } catch (java.util.concurrent.ExecutionException e) {
+ assert e.cause.message == "oops"
+ }
+ '''
+ }
+
+ @Test
+ void testGroovyPromiseThen() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+ import org.apache.groovy.runtime.async.GroovyPromise
+ import java.util.concurrent.CompletableFuture
+
+ def p = GroovyPromise.of(CompletableFuture.supplyAsync { 10 })
+ def p2 = p.then { it * 3 }
+ assert p2.get() == 30
+ '''
+ }
+
+ @Test
+ void testGroovyPromiseThenCompose() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+ import org.apache.groovy.runtime.async.GroovyPromise
+ import java.util.concurrent.CompletableFuture
+
+ def p = GroovyPromise.of(CompletableFuture.supplyAsync { 5 })
+ def p2 = p.thenCompose { val ->
+ GroovyPromise.of(CompletableFuture.supplyAsync { val * 4 })
+ }
+ assert p2.get() == 20
+ '''
+ }
+
+ @Test
+ void testGroovyPromiseExceptionally() {
+ assertScript '''
+ import org.apache.groovy.runtime.async.GroovyPromise
+ import java.util.concurrent.CompletableFuture
+
+ def p = GroovyPromise.of(CompletableFuture.supplyAsync { throw new
RuntimeException("fail") })
+ def recovered = p.exceptionally { t -> "recovered" }
+ assert recovered.get() == "recovered"
+ '''
+ }
+
+ @Test
+ void testAwaitableToCompletableFuture() {
+ assertScript '''
+ import org.apache.groovy.runtime.async.GroovyPromise
+ import java.util.concurrent.CompletableFuture
+
+ def p =
GroovyPromise.of(CompletableFuture.completedFuture("interop"))
+ CompletableFuture<String> cf = p.toCompletableFuture()
+ assert cf.get() == "interop"
+ '''
+ }
+
+ // ==== AwaitableAdapterRegistry tests ====
+
+ @Test
+ void testAdapterRegistryWithCompletableFuture() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+ import groovy.concurrent.AwaitableAdapterRegistry
+ import java.util.concurrent.CompletableFuture
+
+ def cf = CompletableFuture.completedFuture("adapted")
+ Awaitable<String> a = AwaitableAdapterRegistry.toAwaitable(cf)
+ assert a.get() == "adapted"
+ '''
+ }
+
+ @Test
+ void testAdapterRegistryPassthroughAwaitable() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+ import groovy.concurrent.AwaitableAdapterRegistry
+
+ def original = Awaitable.of(42)
+ def adapted = AwaitableAdapterRegistry.toAwaitable(original)
+ assert adapted.is(original)
+ '''
+ }
+
+ @Test
+ void testAdapterRegistryCustomAdapter() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+ import groovy.concurrent.AwaitableAdapter
+ import groovy.concurrent.AwaitableAdapterRegistry
+ import org.apache.groovy.runtime.async.GroovyPromise
+ import java.util.concurrent.CompletableFuture
+
+ // Simulate a custom async type
+ class CustomPromise {
+ final String value
+ CustomPromise(String v) { this.value = v }
+ }
+
+ // Register custom adapter
+ AwaitableAdapterRegistry.register(new AwaitableAdapter() {
+ boolean supportsAwaitable(Class<?> type) {
CustomPromise.isAssignableFrom(type) }
+ def <T> Awaitable<T> toAwaitable(Object source) {
+ return Awaitable.of(((CustomPromise) source).value)
+ }
+ })
+
+ def custom = new CustomPromise("custom-value")
+ Awaitable<String> a = AwaitableAdapterRegistry.toAwaitable(custom)
+ assert a.get() == "custom-value"
+ '''
+ }
+
+ @Test
+ void testAwaitWithCustomAdaptedType() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+ import groovy.concurrent.AwaitableAdapter
+ import groovy.concurrent.AwaitableAdapterRegistry
+ import static groovy.concurrent.AsyncUtils.*
+
+ class Deferred {
+ final Object result
+ Deferred(Object r) { this.result = r }
+ }
+
+ AwaitableAdapterRegistry.register(new AwaitableAdapter() {
+ boolean supportsAwaitable(Class<?> type) {
Deferred.isAssignableFrom(type) }
+ def <T> Awaitable<T> toAwaitable(Object source) {
+ return Awaitable.of(((Deferred) source).result)
+ }
+ })
+
+ // await works with custom types via adapter
+ def d = new Deferred("deferred-value")
+ def result = await(d)
+ assert result == "deferred-value"
+ '''
+ }
+
+ // ==== for-await tests ====
+
+ @Test
+ void testForAwaitWithList() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+
+ class Processor {
+ @groovy.transform.Async
+ def processAll() {
+ def results = []
+ for await (item in [1, 2, 3, 4, 5]) {
+ results << item * 2
+ }
+ return results
+ }
+ }
+
+ def p = new Processor()
+ assert p.processAll().get() == [2, 4, 6, 8, 10]
+ '''
+ }
+
+ @Test
+ void testForAwaitWithBreak() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+
+ class Processor {
+ @groovy.transform.Async
+ def findFirst() {
+ for await (item in [10, 20, 30, 40]) {
+ if (item > 20) {
+ return item
+ }
+ }
+ return -1
+ }
+ }
+
+ def p = new Processor()
+ assert p.findFirst().get() == 30
+ '''
+ }
+
+ @Test
+ void testForAwaitWithAsyncKeyword() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+
+ class Worker {
+ async def sumItems() {
+ int sum = 0
+ for await (item in [1, 2, 3]) {
+ sum += item
+ }
+ return sum
+ }
+ }
+
+ def worker = new Worker()
+ assert worker.sumItems().get() == 6
+ '''
+ }
+
+ // ==== 'async' and 'await' as identifiers (backward compat) ====
+
+ @Test
+ void testAsyncAsVariableName() {
+ assertScript '''
+ def async = 42
+ assert async == 42
+ '''
+ }
+
+ @Test
+ void testAwaitAsVariableName() {
+ assertScript '''
+ def await = "hello"
+ assert await == "hello"
+ '''
+ }
+
+ @Test
+ void testAsyncAsMethodName() {
+ assertScript '''
+ class Compat {
+ def async() { return "compatible" }
+ }
+ assert new Compat().async() == "compatible"
+ '''
+ }
+
+ // ==== Combined async/await keyword with Awaitable ====
+
+ @Test
+ void testFullAsyncAwaitPipeline() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.CompletableFuture
+
+ class DataService {
+ async def fetchUser(long id) {
+ return [name: "User${id}"]
+ }
+
+ async def fetchOrders(long id) {
+ return ["order1", "order2"]
+ }
+
+ async def getUserSummary(long id) {
+ def user = await fetchUser(id)
+ def orders = await fetchOrders(id)
+ return [user: user, orders: orders]
+ }
+ }
+
+ def service = new DataService()
+ def summary = service.getUserSummary(1).get()
+ assert summary.user.name == "User1"
+ assert summary.orders.size() == 2
+ '''
+ }
+
+ @Test
+ void testAsyncAwaitWithExceptionRecovery() {
+ assertScript '''
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.CompletableFuture
+
+ class ResilientService {
+ async def fetchWithFallback() {
+ try {
+ return await CompletableFuture.supplyAsync { throw new
RuntimeException("primary failed") }
+ } catch (RuntimeException e) {
+ return "fallback-value"
+ }
+ }
+ }
+
+ def service = new ResilientService()
+ assert service.fetchWithFallback().get() == "fallback-value"
+ '''
+ }
+
+ @Test
+ void testAsyncAwaitMixedWithAnnotation() {
+ assertScript '''
+ import groovy.transform.Async
+ import groovy.concurrent.Awaitable
+ import java.util.concurrent.CompletableFuture
+
+ class MixedService {
+ // Annotation style
+ @Async
+ def annotationStyle() {
+ return await CompletableFuture.supplyAsync { "annotation" }
+ }
+
+ // Keyword style
+ async def keywordStyle() {
+ return await CompletableFuture.supplyAsync { "keyword" }
+ }
+
+ // They interoperate
+ async def combined() {
+ def a = await annotationStyle()
+ def b = await keywordStyle()
+ return "${a}+${b}"
+ }
+ }
+
+ def svc = new MixedService()
+ assert svc.combined().get() == "annotation+keyword"
+ '''
+ }
+
+ // ==== Coverage improvement tests ====
+
+ @Test
+ void testAwaitableAdapterRegistryUnregister() {
+ assertScript '''
+ import groovy.concurrent.*
+ import static groovy.concurrent.AsyncUtils.*
+
+ class FakePromise { String val }
+
+ def adapter = new AwaitableAdapter() {
+ boolean supportsAwaitable(Class<?> type) {
FakePromise.isAssignableFrom(type) }
+ def <T> Awaitable<T> toAwaitable(Object source) {
Awaitable.of(((FakePromise) source).val) }
+ }
+
+ AwaitableAdapterRegistry.register(adapter)
+ assert await(new FakePromise(val: "hello")) == "hello"
+
+ assert AwaitableAdapterRegistry.unregister(adapter)
+ assert !AwaitableAdapterRegistry.unregister(adapter) // already
removed
+
+ try {
+ AwaitableAdapterRegistry.toAwaitable(new FakePromise(val: "x"))
+ assert false : "should have thrown"
+ } catch (IllegalArgumentException expected) {
+ assert expected.message.contains("No AwaitableAdapter")
+ }
+ '''
+ }
+
+ @Test
+ void testAwaitableAdapterRegistryRegisterReturnsAutoCloseable() {
+ assertScript '''
+ import groovy.concurrent.*
+ import static groovy.concurrent.AsyncUtils.*
+
+ class Token { int id }
+
+ def handle = AwaitableAdapterRegistry.register(new
AwaitableAdapter() {
+ boolean supportsAwaitable(Class<?> type) {
Token.isAssignableFrom(type) }
+ def <T> Awaitable<T> toAwaitable(Object source) {
Awaitable.of(((Token) source).id) }
+ })
+
+ assert await(new Token(id: 42)) == 42
+ handle.close() // unregister via AutoCloseable
+
+ try {
+ AwaitableAdapterRegistry.toAwaitable(new Token(id: 1))
+ assert false
+ } catch (IllegalArgumentException expected) { }
+ '''
+ }
+
+ @Test
+ void testAwaitableAdapterRegistrySetBlockingExecutor() {
+ assertScript '''
+ import groovy.concurrent.*
+ import static groovy.concurrent.AsyncUtils.*
+ import java.util.concurrent.*
+
+ def customExecutor = Executors.newSingleThreadExecutor()
+ try {
+ AwaitableAdapterRegistry.setBlockingExecutor(customExecutor)
+ // Create a plain Future (not CompletableFuture) to exercise
blocking path
+ def pool = Executors.newSingleThreadExecutor()
+ Future<String> future = pool.submit({ "from-blocking-future" }
as Callable)
+ Thread.sleep(50) // let it complete
+ def result = await(future)
+ assert result == "from-blocking-future"
+ pool.shutdown()
+ } finally {
+ AwaitableAdapterRegistry.setBlockingExecutor(null)
+ customExecutor.shutdown()
+ }
+ '''
+ }
+
+ @Test
+ void testAwaitableAdapterRegistryToAsyncStreamWithIterator() {
+ assertScript '''
+ import groovy.concurrent.*
+ import static groovy.concurrent.AsyncUtils.*
+
+ // Test with Iterator directly
+ def iter = [10, 20, 30].iterator()
+ AsyncStream stream = AwaitableAdapterRegistry.toAsyncStream(iter)
+ def results = []
+ while (await(stream.moveNext())) {
+ results << stream.getCurrent()
+ }
+ assert results == [10, 20, 30]
+ '''
+ }
+
+ @Test
+ void testAwaitableAdapterRegistryToAsyncStreamUnsupportedType() {
+ assertScript '''
+ import groovy.concurrent.*
+
+ try {
+ AwaitableAdapterRegistry.toAsyncStream("a string")
+ assert false : "should throw"
+ } catch (IllegalArgumentException expected) {
+ assert expected.message.contains("No AsyncStream adapter")
+ }
+ '''
+ }
+
+ @Test
+ void testAwaitableAdapterRegistryToAwaitableUnsupportedType() {
+ assertScript '''
+ import groovy.concurrent.*
+
+ try {
+ AwaitableAdapterRegistry.toAwaitable("plain string")
+ assert false : "should throw"
+ } catch (IllegalArgumentException expected) {
+ assert expected.message.contains("No AwaitableAdapter")
+ }
+ '''
+ }
+
+ @Test
+ void testAwaitableAdapterRegistryPlainFutureNotDone() {
+ assertScript '''
+ import groovy.concurrent.*
+ import static groovy.concurrent.AsyncUtils.*
+ import java.util.concurrent.*
+
+ // Create a Future that completes with a delay (not done at
adaptation time)
+ def pool = Executors.newSingleThreadExecutor()
+ Future<Integer> future = pool.submit({
+ Thread.sleep(50)
+ return 99
+ } as Callable)
+ // Future is NOT done yet at this point
+ def result = await(future)
+ assert result == 99
+ pool.shutdown()
+ '''
+ }
+
+ @Test
+ void testAwaitableAdapterRegistryPlainFutureDone() {
+ assertScript '''
+ import groovy.concurrent.*
+ import static groovy.concurrent.AsyncUtils.*
+ import java.util.concurrent.*
+
+ // Create a Future that is already done
+ def pool = Executors.newSingleThreadExecutor()
+ Future<Integer> future = pool.submit({ 77 } as Callable)
+ Thread.sleep(100)
+ assert future.isDone()
+ def result = await(future)
+ assert result == 77
+ pool.shutdown()
+ '''
+ }
+
+ @Test
+ void testAsyncStreamGeneratorErrorPropagation() {
+ assertScript '''
+ import groovy.concurrent.*
+ import static groovy.concurrent.AsyncUtils.*
+
+ @groovy.transform.Async
+ def failingGenerator() {
+ yield return "ok"
+ throw new IOException("generator failed")
+ }
+
+ def stream = failingGenerator()
+ assert await(stream.moveNext()) == true
+ assert stream.getCurrent() == "ok"
+
+ try {
+ await(stream.moveNext())
+ assert false : "should throw"
+ } catch (IOException expected) {
+ assert expected.message == "generator failed"
+ }
+ '''
+ }
+
+ @Test
+ void testAsyncStreamGeneratorMultipleYields() {
+ assertScript '''
+ import groovy.concurrent.*
+ import static groovy.concurrent.AsyncUtils.*
+
+ @groovy.transform.Async
+ def counter() {
+ for (i in 1..5) {
+ yield return i
+ }
+ }
+
+ def stream = counter()
+ def items = []
+ while (await(stream.moveNext())) {
+ items << stream.getCurrent()
+ }
+ assert items == [1, 2, 3, 4, 5]
+ '''
+ }
+
+ @Test
+ void testAsyncStreamEmpty() {
+ assertScript '''
+ import groovy.concurrent.*
+ import static groovy.concurrent.AsyncUtils.*
+
+ def stream = AsyncStream.empty()
+ assert await(stream.moveNext()) == false
+ '''
+ }
+
+ @Test
+ void testAwaitNullReturnsNull() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+
+ def result = await((Object) null)
+ assert result == null
+ '''
+ }
+
+ @Test
+ void testAwaitCompletionStage() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import java.util.concurrent.*
+
+ CompletionStage<String> stage =
CompletableFuture.completedFuture("stage-value")
+ def result = await(stage)
+ assert result == "stage-value"
+ '''
+ }
+
+ @Test
+ void testAwaitPlainFutureViaObjectOverload() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import java.util.concurrent.*
+
+ // Use Object overload which dispatches to Future overload
+ def pool = Executors.newSingleThreadExecutor()
+ Object future = pool.submit({ "future-obj" } as Callable)
+ def result = await(future)
+ assert result == "future-obj"
+ pool.shutdown()
+ '''
+ }
+
+ @Test
+ void testToAsyncStreamWithNullReturnsEmpty() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import groovy.concurrent.AsyncStream
+
+ AsyncStream stream = toAsyncStream(null)
+ assert await(stream.moveNext()) == false
+ '''
+ }
+
+ @Test
+ void testDeepUnwrapWithNestedExceptions() {
+ assertScript '''
+ import static groovy.concurrent.AsyncUtils.*
+ import java.util.concurrent.*
+
+ def root = new IOException("root cause")
+ def l1 = new ExecutionException(root)
+ def l2 = new CompletionException(l1)
+ def l3 = new CompletionException(l2)
+
+ assert deepUnwrap(l3).is(root)
+ assert deepUnwrap(root).is(root)
+
+ // Non-wrapper exception is returned as-is
+ def re = new RuntimeException("direct")
+ assert deepUnwrap(re).is(re)
+ '''
+ }
+
+ @Test
+ void testBackwardCompatFacade() {
+ // Verify the deprecated groovy.transform.AsyncUtils still works
+ assertScript '''
+ import static groovy.transform.AsyncUtils.*
+
+ def result = await(async { 42 })
+ assert result == 42
+
+ def list = awaitAll(async { 1 }, async { 2 })
+ assert list == [1, 2]
+
+ def first = awaitAny(async { "fast" })
+ assert first == "fast"
+
+ setExecutor(null) // reset
+ '''
+ }
+
+ // ==== Fallback thread pool configuration tests ====
+
+ @Test
+ void testFallbackPoolUsesNamedDaemonThreads() {
+ assertScript '''
+ import org.apache.groovy.runtime.async.AsyncSupport
+ import groovy.concurrent.AsyncUtils
+
+ // When using the default executor, async tasks should run on
+ // named daemon threads (groovy-async-*) on JDK < 21, or
+ // virtual threads on JDK 21+.
+ def result = async {
+ return Thread.currentThread().name
+ }
+ def threadName = AsyncUtils.await(result)
+ assert threadName != null
+
+ if (AsyncSupport.isVirtualThreadsAvailable()) {
+ // Virtual threads have default empty names; verify task ran
+ assert true
+ } else {
+ // Fallback pool uses "groovy-async-<id>" naming
+ assert threadName.startsWith('groovy-async-') : \
+ "Expected 'groovy-async-*' but got: ${threadName}"
+ }
+ '''
+ }
+
+ @Test
+ void testCustomExecutorOverrideAndReset() {
+ assertScript '''
+ import groovy.concurrent.AsyncUtils
+ import java.util.concurrent.Executors
+ import java.util.concurrent.Executor
+
+ // Save original executor
+ def original = AsyncUtils.getExecutor()
+ assert original != null
+
+ // Override with custom executor
+ def customPool = Executors.newSingleThreadExecutor { r ->
+ def t = new Thread(r, 'my-custom-async')
+ t.daemon = true
+ t
+ }
+ try {
+ AsyncUtils.setExecutor(customPool)
+ assert AsyncUtils.getExecutor().is(customPool)
+
+ def threadName = AsyncUtils.await(async {
Thread.currentThread().name })
+ assert threadName == 'my-custom-async'
+ } finally {
+ // Reset to default
+ AsyncUtils.setExecutor(null)
+ assert AsyncUtils.getExecutor() != null
+ assert !AsyncUtils.getExecutor().is(customPool)
+ }
Review Comment:
`customPool` is never shut down. Even though the global executor is reset,
the created executor service will keep running threads and can leak across the
test suite. Please shut down the executor (and optionally await termination) in
the `finally` block after resetting the global executor.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]