Copilot commented on code in PR #2481:
URL: https://github.com/apache/groovy/pull/2481#discussion_r3142862905
##########
src/main/java/org/apache/groovy/runtime/async/DefaultAsyncScope.java:
##########
@@ -302,6 +320,60 @@ private void pruneCompleted() {
}
}
+ // ---- Child scope management -----------------------------------------
+
+ void registerChildScope(DefaultAsyncScope child) {
+ synchronized (lock) {
+ if (!closed) {
+ childScopes.add(child);
+ }
+ }
+ }
+
+ void deregisterChildScope(DefaultAsyncScope child) {
+ synchronized (lock) {
+ childScopes.remove(child);
+ }
+ }
+
+ // ---- Timeout support ------------------------------------------------
+
+ /**
+ * Creates a scope with a timeout. If the body does not complete within
+ * the duration, all children are cancelled and {@link TimeoutException}
+ * is thrown.
+ */
+ public static <T> T withScopeTimeout(Executor executor, Duration timeout,
+ Function<AsyncScope, T> body) {
+ Objects.requireNonNull(timeout, "timeout must not be null");
+ Objects.requireNonNull(body, "body must not be null");
+ var timedOut = new java.util.concurrent.atomic.AtomicBoolean(false);
+ Thread bodyThread = Thread.currentThread();
+ try (DefaultAsyncScope scope = new DefaultAsyncScope(executor)) {
+ ScheduledFuture<?> timer = AsyncSupport.getScheduler().schedule(()
-> {
+ timedOut.set(true);
+ scope.cancelAll();
+ bodyThread.interrupt();
+ }, timeout.toMillis(), TimeUnit.MILLISECONDS);
+ try {
+ T result = withCurrent(scope, () -> body.apply(scope));
+ timer.cancel(false);
+ return result;
+ } catch (Exception e) {
+ timer.cancel(false);
+ if (timedOut.get()) {
+ // Clear the interrupt flag set by the timeout
+ Thread.interrupted();
+ var te = new java.util.concurrent.CancellationException(
+ "Scope timed out after " + timeout);
+ te.initCause(e);
+ throw te;
+ }
+ throw e;
+ }
Review Comment:
withScopeTimeout only cancels the timer in the normal/Exception paths; if
the body throws an Error/Throwable that bypasses the catch, the scheduled
timeout task can still fire later and interrupt the current thread
unexpectedly. Use a finally block to always cancel the timer, and consider
catching Throwable (not just Exception) for consistent cleanup. Also, the
method javadoc says TimeoutException but the implementation throws
CancellationException when timing out—align the thrown type with the docs/API
contract.
##########
src/test/groovy/groovy/concurrent/FlowPublisherAdapterTest.groovy:
##########
@@ -82,11 +82,16 @@ final class FlowPublisherAdapterTest {
@Test
void testForAwaitIteratesAllValues() {
+ // SubmissionPublisher.submit delivers only to subscribers present at
+ // submit-time, so we must wait for `for await` to subscribe before
+ // the producer starts — otherwise on fast CI the producer races ahead
+ // and all 5 values are dropped before the consumer ever sees them.
assertScript '''
import java.util.concurrent.SubmissionPublisher
def publisher = new SubmissionPublisher<Integer>()
async {
+ while (publisher.numberOfSubscribers == 0) Thread.sleep(1)
(1..5).each { publisher.submit(it) }
Review Comment:
The test uses an unbounded busy-wait loop on numberOfSubscribers with
Thread.sleep(1). If subscription never happens (regression), this async task
can hang the test suite indefinitely. Add a timeout/deadline (e.g., max wait
duration then fail) or use a latch/handshake so failures terminate promptly.
##########
src/main/java/org/codehaus/groovy/runtime/metaclass/MetaClassRegistryImpl.java:
##########
@@ -186,6 +188,30 @@ public ExtensionModuleRegistry getModuleRegistry() {
*
* @see groovy.lang.MetaClassRegistry.MetaClassCreationHandle
*/
+ /**
+ * Detects when both Groovy core and groovy-concurrent-java are on the
+ * classpath — the same classes would exist in two jars. Logs a warning
+ * so Maven users (who lack Gradle's capability mechanism) are alerted.
+ */
+ private static void checkForDuplicateConcurrentModule() {
+ try {
+ var coreSource =
groovy.lang.GroovyObject.class.getProtectionDomain().getCodeSource();
+ var concurrentSource =
groovy.concurrent.AsyncScope.class.getProtectionDomain().getCodeSource();
+ if (coreSource != null && concurrentSource != null) {
+ var coreLoc = coreSource.getLocation();
+ var concurrentLoc = concurrentSource.getLocation();
+ if (coreLoc != null && concurrentLoc != null &&
!coreLoc.equals(concurrentLoc)) {
+
java.util.logging.Logger.getLogger(MetaClassRegistryImpl.class.getName()).warning(
+ "Both Groovy core (" + coreLoc + ") and
groovy-concurrent-java (" + concurrentLoc
+ + ") are on the classpath. The concurrent API
classes exist in both jars. "
+ + "Remove groovy-concurrent-java when using the
full Groovy runtime.");
+ }
+ }
Review Comment:
The duplicate-module check compares CodeSource of GroovyObject.class vs
AsyncScope.class. If groovy.concurrent.AsyncScope is loaded from Groovy core
(likely when both jars are present), coreLoc == concurrentLoc and the warning
will not trigger even though groovy-concurrent-java is also on the classpath.
Consider detecting duplicates by enumerating resources (e.g.,
ClassLoader.getResources("groovy/concurrent/AsyncScope.class") and warning if
more than one distinct location is found).
##########
src/main/java/org/codehaus/groovy/transform/ActiveObjectASTTransformation.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.codehaus.groovy.transform;
+
+import groovy.concurrent.Actor;
+import groovy.concurrent.Awaitable;
+import groovy.transform.ActiveMethod;
+import groovy.transform.ActiveObject;
+import org.apache.groovy.runtime.async.AsyncSupport;
+import org.codehaus.groovy.ast.ASTNode;
+import org.codehaus.groovy.ast.AnnotationNode;
+import org.codehaus.groovy.ast.ClassHelper;
+import org.codehaus.groovy.ast.ClassNode;
+import org.codehaus.groovy.ast.FieldNode;
+import org.codehaus.groovy.ast.MethodNode;
+import org.codehaus.groovy.ast.Parameter;
+import org.codehaus.groovy.ast.expr.ArgumentListExpression;
+import org.codehaus.groovy.ast.expr.ArrayExpression;
+import org.codehaus.groovy.ast.expr.ConstantExpression;
+import org.codehaus.groovy.ast.expr.Expression;
+import org.codehaus.groovy.ast.expr.MethodCallExpression;
+import org.codehaus.groovy.ast.expr.VariableExpression;
+import org.codehaus.groovy.ast.stmt.ExpressionStatement;
+import org.codehaus.groovy.ast.stmt.ReturnStatement;
+import org.codehaus.groovy.ast.stmt.Statement;
+import org.codehaus.groovy.control.CompilePhase;
+import org.codehaus.groovy.control.SourceUnit;
+import org.codehaus.groovy.runtime.InvokerHelper;
+
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.codehaus.groovy.ast.tools.GeneralUtils.args;
+import static org.codehaus.groovy.ast.tools.GeneralUtils.callX;
+import static org.codehaus.groovy.ast.tools.GeneralUtils.classX;
+import static org.codehaus.groovy.ast.tools.GeneralUtils.constX;
+import static org.codehaus.groovy.ast.tools.GeneralUtils.fieldX;
+import static org.codehaus.groovy.ast.tools.GeneralUtils.returnS;
+import static org.codehaus.groovy.ast.tools.GeneralUtils.stmt;
+import static org.codehaus.groovy.ast.tools.GeneralUtils.varX;
+
+/**
+ * Handles the {@link ActiveObject} annotation, transforming
+ * {@link ActiveMethod}-annotated methods to route through an
+ * internal actor for serialised execution.
+ * <p>
+ * Inspired by GPars' {@code ActiveObjectASTTransformation},
+ * adapted for Groovy's built-in {@link Actor} infrastructure.
+ *
+ * @see ActiveObject
+ * @see ActiveMethod
+ * @since 6.0.0
+ */
+@GroovyASTTransformation(phase = CompilePhase.CANONICALIZATION)
+public class ActiveObjectASTTransformation extends AbstractASTTransformation {
+
+ private static final ClassNode ACTOR_TYPE =
ClassHelper.makeWithoutCaching(Actor.class, false);
+ private static final ClassNode AWAITABLE_TYPE =
ClassHelper.makeWithoutCaching(Awaitable.class, false);
+ private static final ClassNode ASYNC_SUPPORT_TYPE =
ClassHelper.makeWithoutCaching(AsyncSupport.class, false);
+ private static final ClassNode INVOKER_HELPER_TYPE =
ClassHelper.makeWithoutCaching(InvokerHelper.class, false);
+ private static final ClassNode ACTIVE_METHOD_TYPE =
ClassHelper.makeWithoutCaching(ActiveMethod.class, false);
+
+ private static final String METHOD_NAME_PREFIX = "activeObject_";
+
+ @Override
+ public void visit(ASTNode[] nodes, SourceUnit source) {
+ init(nodes, source);
+ AnnotationNode annotation = (AnnotationNode) nodes[0];
+ ClassNode classNode = (ClassNode) nodes[1];
+
+ String actorFieldName = getMemberStringValue(annotation, "actorName",
"internalActiveObjectActor");
+
+ // Add actor field if not already present
+ FieldNode actorField = classNode.getDeclaredField(actorFieldName);
+ if (actorField == null) {
+ actorField = addActorField(classNode, actorFieldName);
+ }
+
+ // Transform all @ActiveMethod methods
+ for (MethodNode method : new ArrayList<>(classNode.getMethods())) {
+ List<AnnotationNode> activeAnnotations =
method.getAnnotations(ACTIVE_METHOD_TYPE);
+ if (!activeAnnotations.isEmpty()) {
+ if (method.isStatic()) {
+ addError("@ActiveMethod cannot be applied to static
methods: " + method.getName(), method);
+ continue;
+ }
+ boolean blocking = memberHasValue(activeAnnotations.get(0),
"blocking", false)
+ ? false : true; // default is true
+ transformMethod(classNode, method, actorField, blocking);
+ }
+ }
+ }
+
+ /**
+ * Adds the actor field to the class, initialised with a reactor
+ * that dispatches messages via InvokerHelper.
+ */
+ private FieldNode addActorField(ClassNode classNode, String fieldName) {
+ // Actor.reactor { msg -> InvokerHelper.invokeMethod(msg[0], msg[1],
msg[2]) }
+ // The reactor receives [target, methodName, argsArray] and invokes
the renamed method
+ Expression initializer = callX(
+ classX(ACTOR_TYPE),
+ "reactor",
+ args(buildDispatchLambda()));
+
+ return classNode.addField(fieldName,
+ Modifier.FINAL | Modifier.TRANSIENT | Modifier.PRIVATE,
+ ACTOR_TYPE,
+ initializer);
+ }
+
+ /**
+ * Builds the dispatch function:
+ * { msg -> InvokerHelper.invokeMethod(msg[0], "activeObject_" + msg[1],
msg[2]) }
+ */
+ private Expression buildDispatchLambda() {
+ // We build a closure that:
+ // 1. Extracts target = msg[0], methodName = msg[1], args = msg[2]
+ // 2. Calls InvokerHelper.invokeMethod(target, "activeObject_" +
methodName, args)
+ Parameter msgParam = new Parameter(ClassHelper.OBJECT_TYPE, "msg");
+ Expression msgVar = varX(msgParam);
+
+ // msg[0] = target, msg[1] = method name, msg[2] = args array
+ Expression target = callX(msgVar, "getAt", constX(0));
+ Expression methodName = callX(
+ constX(METHOD_NAME_PREFIX), "plus",
+ callX(msgVar, "getAt", constX(1)));
+ Expression argsExpr = callX(msgVar, "getAt", constX(2));
+
+ Expression invokeCall = callX(
+ classX(INVOKER_HELPER_TYPE),
+ "invokeMethod",
+ args(target, methodName, argsExpr));
+
+ Statement body = returnS(invokeCall);
+
+ org.codehaus.groovy.ast.expr.ClosureExpression closure =
+ new org.codehaus.groovy.ast.expr.ClosureExpression(
+ new Parameter[]{msgParam}, body);
+ closure.setVariableScope(new org.codehaus.groovy.ast.VariableScope());
+
+ return closure;
+ }
+
+ /**
+ * Transforms a single method:
+ * 1. Renames original to activeObject_methodName (private)
+ * 2. Replaces body with actor.sendAndGet([this, name, args]) dispatch
+ */
+ private void transformMethod(ClassNode classNode, MethodNode method,
+ FieldNode actorField, boolean blocking) {
+ String originalName = method.getName();
+ String renamedName = findUniqueName(classNode, originalName);
+
+ // Create the renamed private method with the original body
+ MethodNode renamed = new MethodNode(
+ renamedName,
+ Modifier.PRIVATE | Modifier.FINAL,
+ method.getReturnType(),
+ method.getParameters(),
+ method.getExceptions(),
+ method.getCode());
+ renamed.setSourcePosition(method);
+ classNode.addMethod(renamed);
+
+ // Build the message: [this, "originalName", [param1, param2, ...]]
+ List<Expression> argsList = new ArrayList<>();
+ for (Parameter p : method.getParameters()) {
+ argsList.add(varX(p.getName()));
+ }
+ Expression argsArray = new ArrayExpression(
+ ClassHelper.OBJECT_TYPE, argsList);
+
+ List<Expression> messageElements = new ArrayList<>();
+ messageElements.add(varX("this"));
+ messageElements.add(constX(originalName));
+ messageElements.add(argsArray);
+ Expression message = new
org.codehaus.groovy.ast.expr.ListExpression(messageElements);
+
+ // actor.sendAndGet(message)
+ Expression sendCall = callX(
+ fieldX(actorField),
+ "sendAndGet",
+ args(message));
+
+ Statement newBody;
+ if (blocking) {
+ // AsyncSupport.await(actor.sendAndGet(message))
+ Expression awaitCall = callX(
+ classX(ASYNC_SUPPORT_TYPE),
+ "await",
+ args(sendCall));
+
+ if (method.getReturnType().equals(ClassHelper.VOID_TYPE)) {
+ newBody = stmt(awaitCall);
+ } else {
+ newBody = returnS(awaitCall);
+ }
+ } else {
+ // return actor.sendAndGet(message) — returns Awaitable
+ newBody = returnS(sendCall);
+ }
+
+ method.setCode(newBody);
+ }
+
+ private String findUniqueName(ClassNode classNode, String originalName) {
+ String candidate = METHOD_NAME_PREFIX + originalName;
+ int counter = 0;
+ while (classNode.getDeclaredMethod(candidate, Parameter.EMPTY_ARRAY)
!= null) {
Review Comment:
findUniqueName checks for collisions using getDeclaredMethod(candidate,
Parameter.EMPTY_ARRAY), which only detects an existing no-arg method. If the
class already has a method with the candidate name but different parameters,
this will still generate a duplicate method name and can fail compilation.
Check for any declared method with that name (or check the full signature set),
e.g. classNode.getMethods(candidate) / getDeclaredMethods(...) and consider
parameter lists.
```suggestion
while (!classNode.getDeclaredMethods(candidate).isEmpty()) {
```
##########
src/main/java/org/codehaus/groovy/transform/ActiveObjectASTTransformation.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.codehaus.groovy.transform;
+
+import groovy.concurrent.Actor;
+import groovy.concurrent.Awaitable;
+import groovy.transform.ActiveMethod;
+import groovy.transform.ActiveObject;
+import org.apache.groovy.runtime.async.AsyncSupport;
+import org.codehaus.groovy.ast.ASTNode;
+import org.codehaus.groovy.ast.AnnotationNode;
+import org.codehaus.groovy.ast.ClassHelper;
+import org.codehaus.groovy.ast.ClassNode;
+import org.codehaus.groovy.ast.FieldNode;
+import org.codehaus.groovy.ast.MethodNode;
+import org.codehaus.groovy.ast.Parameter;
+import org.codehaus.groovy.ast.expr.ArgumentListExpression;
+import org.codehaus.groovy.ast.expr.ArrayExpression;
+import org.codehaus.groovy.ast.expr.ConstantExpression;
+import org.codehaus.groovy.ast.expr.Expression;
+import org.codehaus.groovy.ast.expr.MethodCallExpression;
+import org.codehaus.groovy.ast.expr.VariableExpression;
+import org.codehaus.groovy.ast.stmt.ExpressionStatement;
+import org.codehaus.groovy.ast.stmt.ReturnStatement;
+import org.codehaus.groovy.ast.stmt.Statement;
+import org.codehaus.groovy.control.CompilePhase;
+import org.codehaus.groovy.control.SourceUnit;
+import org.codehaus.groovy.runtime.InvokerHelper;
+
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.codehaus.groovy.ast.tools.GeneralUtils.args;
+import static org.codehaus.groovy.ast.tools.GeneralUtils.callX;
+import static org.codehaus.groovy.ast.tools.GeneralUtils.classX;
+import static org.codehaus.groovy.ast.tools.GeneralUtils.constX;
+import static org.codehaus.groovy.ast.tools.GeneralUtils.fieldX;
+import static org.codehaus.groovy.ast.tools.GeneralUtils.returnS;
+import static org.codehaus.groovy.ast.tools.GeneralUtils.stmt;
+import static org.codehaus.groovy.ast.tools.GeneralUtils.varX;
+
+/**
+ * Handles the {@link ActiveObject} annotation, transforming
+ * {@link ActiveMethod}-annotated methods to route through an
+ * internal actor for serialised execution.
+ * <p>
+ * Inspired by GPars' {@code ActiveObjectASTTransformation},
+ * adapted for Groovy's built-in {@link Actor} infrastructure.
+ *
+ * @see ActiveObject
+ * @see ActiveMethod
+ * @since 6.0.0
+ */
+@GroovyASTTransformation(phase = CompilePhase.CANONICALIZATION)
+public class ActiveObjectASTTransformation extends AbstractASTTransformation {
+
+ private static final ClassNode ACTOR_TYPE =
ClassHelper.makeWithoutCaching(Actor.class, false);
+ private static final ClassNode AWAITABLE_TYPE =
ClassHelper.makeWithoutCaching(Awaitable.class, false);
+ private static final ClassNode ASYNC_SUPPORT_TYPE =
ClassHelper.makeWithoutCaching(AsyncSupport.class, false);
+ private static final ClassNode INVOKER_HELPER_TYPE =
ClassHelper.makeWithoutCaching(InvokerHelper.class, false);
+ private static final ClassNode ACTIVE_METHOD_TYPE =
ClassHelper.makeWithoutCaching(ActiveMethod.class, false);
+
+ private static final String METHOD_NAME_PREFIX = "activeObject_";
+
+ @Override
+ public void visit(ASTNode[] nodes, SourceUnit source) {
+ init(nodes, source);
+ AnnotationNode annotation = (AnnotationNode) nodes[0];
+ ClassNode classNode = (ClassNode) nodes[1];
+
+ String actorFieldName = getMemberStringValue(annotation, "actorName",
"internalActiveObjectActor");
+
+ // Add actor field if not already present
+ FieldNode actorField = classNode.getDeclaredField(actorFieldName);
+ if (actorField == null) {
+ actorField = addActorField(classNode, actorFieldName);
+ }
+
+ // Transform all @ActiveMethod methods
+ for (MethodNode method : new ArrayList<>(classNode.getMethods())) {
+ List<AnnotationNode> activeAnnotations =
method.getAnnotations(ACTIVE_METHOD_TYPE);
+ if (!activeAnnotations.isEmpty()) {
+ if (method.isStatic()) {
+ addError("@ActiveMethod cannot be applied to static
methods: " + method.getName(), method);
+ continue;
+ }
+ boolean blocking = memberHasValue(activeAnnotations.get(0),
"blocking", false)
+ ? false : true; // default is true
+ transformMethod(classNode, method, actorField, blocking);
+ }
+ }
+ }
+
+ /**
+ * Adds the actor field to the class, initialised with a reactor
+ * that dispatches messages via InvokerHelper.
+ */
+ private FieldNode addActorField(ClassNode classNode, String fieldName) {
+ // Actor.reactor { msg -> InvokerHelper.invokeMethod(msg[0], msg[1],
msg[2]) }
+ // The reactor receives [target, methodName, argsArray] and invokes
the renamed method
+ Expression initializer = callX(
+ classX(ACTOR_TYPE),
+ "reactor",
+ args(buildDispatchLambda()));
+
+ return classNode.addField(fieldName,
+ Modifier.FINAL | Modifier.TRANSIENT | Modifier.PRIVATE,
+ ACTOR_TYPE,
+ initializer);
+ }
+
+ /**
+ * Builds the dispatch function:
+ * { msg -> InvokerHelper.invokeMethod(msg[0], "activeObject_" + msg[1],
msg[2]) }
+ */
+ private Expression buildDispatchLambda() {
+ // We build a closure that:
+ // 1. Extracts target = msg[0], methodName = msg[1], args = msg[2]
+ // 2. Calls InvokerHelper.invokeMethod(target, "activeObject_" +
methodName, args)
+ Parameter msgParam = new Parameter(ClassHelper.OBJECT_TYPE, "msg");
+ Expression msgVar = varX(msgParam);
+
+ // msg[0] = target, msg[1] = method name, msg[2] = args array
+ Expression target = callX(msgVar, "getAt", constX(0));
+ Expression methodName = callX(
+ constX(METHOD_NAME_PREFIX), "plus",
+ callX(msgVar, "getAt", constX(1)));
+ Expression argsExpr = callX(msgVar, "getAt", constX(2));
+
+ Expression invokeCall = callX(
+ classX(INVOKER_HELPER_TYPE),
+ "invokeMethod",
+ args(target, methodName, argsExpr));
+
+ Statement body = returnS(invokeCall);
+
+ org.codehaus.groovy.ast.expr.ClosureExpression closure =
+ new org.codehaus.groovy.ast.expr.ClosureExpression(
+ new Parameter[]{msgParam}, body);
+ closure.setVariableScope(new org.codehaus.groovy.ast.VariableScope());
+
+ return closure;
+ }
+
+ /**
+ * Transforms a single method:
+ * 1. Renames original to activeObject_methodName (private)
+ * 2. Replaces body with actor.sendAndGet([this, name, args]) dispatch
+ */
+ private void transformMethod(ClassNode classNode, MethodNode method,
+ FieldNode actorField, boolean blocking) {
+ String originalName = method.getName();
+ String renamedName = findUniqueName(classNode, originalName);
+
+ // Create the renamed private method with the original body
+ MethodNode renamed = new MethodNode(
+ renamedName,
+ Modifier.PRIVATE | Modifier.FINAL,
+ method.getReturnType(),
+ method.getParameters(),
+ method.getExceptions(),
+ method.getCode());
+ renamed.setSourcePosition(method);
+ classNode.addMethod(renamed);
+
+ // Build the message: [this, "originalName", [param1, param2, ...]]
+ List<Expression> argsList = new ArrayList<>();
+ for (Parameter p : method.getParameters()) {
+ argsList.add(varX(p.getName()));
+ }
+ Expression argsArray = new ArrayExpression(
+ ClassHelper.OBJECT_TYPE, argsList);
+
+ List<Expression> messageElements = new ArrayList<>();
+ messageElements.add(varX("this"));
+ messageElements.add(constX(originalName));
+ messageElements.add(argsArray);
+ Expression message = new
org.codehaus.groovy.ast.expr.ListExpression(messageElements);
+
+ // actor.sendAndGet(message)
+ Expression sendCall = callX(
+ fieldX(actorField),
+ "sendAndGet",
+ args(message));
+
+ Statement newBody;
+ if (blocking) {
+ // AsyncSupport.await(actor.sendAndGet(message))
+ Expression awaitCall = callX(
+ classX(ASYNC_SUPPORT_TYPE),
+ "await",
+ args(sendCall));
+
+ if (method.getReturnType().equals(ClassHelper.VOID_TYPE)) {
+ newBody = stmt(awaitCall);
+ } else {
+ newBody = returnS(awaitCall);
+ }
+ } else {
+ // return actor.sendAndGet(message) — returns Awaitable
+ newBody = returnS(sendCall);
+ }
+
+ method.setCode(newBody);
Review Comment:
For blocking=false, the transformed method body returns the Awaitable from
actor.sendAndGet, but the MethodNode return type is left unchanged. If the
original method has a non-Object/primitive return type, this can produce
invalid bytecode (returning an object from a primitive-returning method) or
misleading signatures for Java callers. The transform should either (1) update
the method return type to Awaitable<OriginalType> (or Awaitable<Void>) or (2)
emit a compile error unless the declared return type is compatible (e.g.,
def/Object/Awaitable).
##########
src/main/java/groovy/concurrent/AsyncScope.java:
##########
@@ -115,35 +121,94 @@ static <T> T withCurrent(AsyncScope scope, Supplier<T>
supplier) {
}
/**
- * Creates a scope, executes the closure within it, and ensures the
- * scope is closed on exit. The closure receives the scope as its
+ * Creates a scope, executes the body within it, and ensures the
+ * scope is closed on exit. The body receives the scope as its
* argument for launching child tasks.
*
* <pre>{@code
+ * // Java:
+ * var result = AsyncScope.withScope(scope -> {
+ * var a = scope.async(() -> computeA());
+ * var b = scope.async(() -> computeB());
+ * return List.of(AsyncSupport.await(a), AsyncSupport.await(b));
+ * });
+ *
+ * // Groovy (Closure overload added via extension method):
* def result = AsyncScope.withScope { scope ->
* def a = scope.async { computeA() }
* def b = scope.async { computeB() }
* return [await(a), await(b)]
* }
* }</pre>
+ *
+ * @param body the function to execute within the scope
+ * @param <T> the result type
+ * @return the body's return value
*/
- @SuppressWarnings("unchecked")
- static <T> T withScope(
- @ClosureParams(value = SimpleType.class, options =
"groovy.concurrent.AsyncScope") Closure<T> body) {
+ static <T> T withScope(Function<AsyncScope, T> body) {
return withScope(AsyncSupport.getExecutor(), body);
}
/**
- * Creates a scope with the given executor, executes the closure,
+ * Creates a scope with the given executor, executes the body,
* and ensures the scope is closed on exit.
+ *
+ * @param executor the executor for child tasks
+ * @param body the function to execute within the scope
+ * @param <T> the result type
+ * @return the body's return value
*/
- @SuppressWarnings("unchecked")
- static <T> T withScope(Executor executor,
- @ClosureParams(value = SimpleType.class, options =
"groovy.concurrent.AsyncScope") Closure<T> body) {
+ static <T> T withScope(Executor executor, Function<AsyncScope, T> body) {
Objects.requireNonNull(body, "body must not be null");
- try (AsyncScope scope = create(executor)) {
- return withCurrent(scope, () -> body.call(scope));
+ AsyncScope scope = create(executor);
+ T result;
+ try {
+ result = withCurrent(scope, () -> body.apply(scope));
+ } catch (Throwable bodyError) {
+ try {
+ scope.close();
+ } catch (Throwable closeError) {
+ if (closeError != bodyError) {
+ bodyError.addSuppressed(closeError);
+ }
+ }
+ if (bodyError instanceof RuntimeException re) throw re;
+ if (bodyError instanceof Error err) throw err;
+ throw new RuntimeException(bodyError);
}
+ scope.close();
+ return result;
+ }
+
+ /**
+ * Creates a scope with a timeout. If the body does not complete within
+ * the specified duration, all child tasks are cancelled and the scope
+ * throws {@link java.util.concurrent.TimeoutException}.
+ *
+ * @param timeout the maximum duration for the scope
+ * @param body the function to execute within the scope
+ * @param <T> the result type
+ * @return the body's return value
+ * @throws java.util.concurrent.TimeoutException if the timeout expires
+ * @since 6.0.0
Review Comment:
AsyncScope.withScope(Duration, ...) documents that it throws
TimeoutException on timeout, but DefaultAsyncScope.withScopeTimeout currently
throws CancellationException (and tests appear to expect
CancellationException). Please align the public API contract: either update the
implementation to throw TimeoutException, or adjust the javadoc/@throws to
match the actual exception type thrown.
##########
src/main/java/org/codehaus/groovy/runtime/ParallelCollectionExtensions.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.codehaus.groovy.runtime;
+
+import groovy.concurrent.Pool;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.BiConsumer;
+import java.util.function.BinaryOperator;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * DGM-like extension methods that add parallel collection operations
+ * to {@link Collection}.
+ * <p>
+ * These methods use {@link Pool#current()} to obtain the current pool
+ * (typically set by {@link groovy.concurrent.ParallelScope#withPool}).
+ * If no pool is current, they fall back to {@link ForkJoinPool#commonPool()}.
+ * <p>
+ * All methods delegate to Java parallel streams with pool isolation:
+ * the stream operations run on the current pool's {@link ForkJoinPool},
+ * not the common pool.
+ * <p>
+ * Inspired by GPars' {@code GParsPoolUtil} category methods.
+ *
+ * @since 6.0.0
+ */
+public class ParallelCollectionExtensions {
+
+ // ---- Iteration ------------------------------------------------------
+
+ /**
+ * Iterates over the collection in parallel, applying the action
+ * to each element.
+ */
+ public static <T> void eachParallel(Collection<T> self, Consumer<T>
action) {
+ withCurrentFJP(fjp ->
+ fjp.submit(() -> self.parallelStream().forEach(action)).join()
+ );
+ }
+
+ // ---- Transformation -------------------------------------------------
+
+ /**
+ * Transforms each element in parallel, returning a new list.
+ */
+ public static <T, R> List<R> collectParallel(Collection<T> self,
Function<T, R> transform) {
+ return withCurrentFJP(fjp ->
+ fjp.submit(() ->
self.parallelStream().map(transform).collect(Collectors.toList())).join()
+ );
+ }
+
+ // ---- Filtering ------------------------------------------------------
+
+ /**
+ * Filters the collection in parallel, returning elements that match.
+ */
+ public static <T> List<T> findAllParallel(Collection<T> self, Predicate<T>
filter) {
+ return withCurrentFJP(fjp ->
+ fjp.submit(() ->
self.parallelStream().filter(filter).collect(Collectors.toList())).join()
+ );
+ }
+
+ /**
+ * Finds the first element matching the predicate in encounter order.
+ * Although evaluation happens in parallel, the result is the matching
+ * element with the lowest index. Use {@link #findAnyParallel} if any
+ * match will do — it may be faster as it avoids ordering constraints.
+ */
+ public static <T> T findParallel(Collection<T> self, Predicate<T> filter) {
+ return withCurrentFJP(fjp ->
+ fjp.submit(() ->
self.parallelStream().filter(filter).findFirst().orElse(null)).join()
+ );
+ }
+
+ /**
+ * Finds any element matching the predicate. May be faster than
+ * {@link #findParallel} as it does not preserve encounter order.
+ */
+ public static <T> T findAnyParallel(Collection<T> self, Predicate<T>
filter) {
+ return withCurrentFJP(fjp ->
+ fjp.submit(() ->
self.parallelStream().filter(filter).findAny().orElse(null)).join()
+ );
+ }
+
+ // ---- Predicates -----------------------------------------------------
+
+ /**
+ * Returns {@code true} if any element matches the predicate.
+ */
+ public static <T> boolean anyParallel(Collection<T> self, Predicate<T>
predicate) {
+ return withCurrentFJP(fjp ->
+ fjp.submit(() ->
self.parallelStream().anyMatch(predicate)).join()
+ );
+ }
+
+ /**
+ * Returns {@code true} if all elements match the predicate.
+ */
+ public static <T> boolean everyParallel(Collection<T> self, Predicate<T>
predicate) {
+ return withCurrentFJP(fjp ->
+ fjp.submit(() ->
self.parallelStream().allMatch(predicate)).join()
+ );
+ }
+
+ /**
+ * Counts elements matching the predicate.
+ */
+ public static <T> long countParallel(Collection<T> self, Predicate<T>
predicate) {
+ return withCurrentFJP(fjp ->
+ fjp.submit(() ->
self.parallelStream().filter(predicate).count()).join()
+ );
+ }
+
+ // ---- Aggregation ----------------------------------------------------
+
+ /**
+ * Finds the minimum element using the given comparator.
+ */
+ public static <T> T minParallel(Collection<T> self, Comparator<T>
comparator) {
+ return withCurrentFJP(fjp ->
+ fjp.submit(() ->
self.parallelStream().min(comparator).orElse(null)).join()
+ );
+ }
+
+ /**
+ * Finds the maximum element using the given comparator.
+ */
+ public static <T> T maxParallel(Collection<T> self, Comparator<T>
comparator) {
+ return withCurrentFJP(fjp ->
+ fjp.submit(() ->
self.parallelStream().max(comparator).orElse(null)).join()
+ );
+ }
+
+ /**
+ * Reduces the collection in parallel using the given operator.
+ */
+ public static <T> T sumParallel(Collection<T> self, BinaryOperator<T>
accumulator) {
+ return withCurrentFJP(fjp ->
+ fjp.submit(() ->
self.parallelStream().reduce(accumulator).orElse(null)).join()
+ );
+ }
+
+ /**
+ * Groups elements by the classifier function in parallel.
+ */
+ public static <T, K> Map<K, List<T>> groupByParallel(Collection<T> self,
+ Function<T, K>
classifier) {
+ return withCurrentFJP(fjp ->
+ fjp.submit(() -> self.parallelStream()
+ .collect(Collectors.groupingBy(classifier))).join()
+ );
+ }
+
+ // ---- Tier 2: additional operations ------------------------------------
+
+ /**
+ * Iterates over the collection in parallel with element indices.
+ * Index assignment is based on the collection's iteration order, but
+ * execution order is not guaranteed.
+ */
+ public static <T> void eachWithIndexParallel(Collection<T> self,
BiConsumer<T, Integer> action) {
+ List<T> list = self instanceof List ? (List<T>) self : new
ArrayList<>(self);
+ withCurrentFJP(fjp -> {
+ fjp.submit(() ->
+ IntStream.range(0, list.size()).parallel()
+ .forEach(i -> action.accept(list.get(i),
i))).join();
+ return null;
+ });
+ }
+
+ /**
+ * Transforms each element into a collection and flattens the results
+ * in parallel (parallel flatMap).
+ */
+ public static <T, R> List<R> collectManyParallel(Collection<T> self,
+ Function<T, ? extends
Collection<R>> transform) {
+ return withCurrentFJP(fjp ->
+ fjp.submit(() -> self.parallelStream()
+ .flatMap(e -> transform.apply(e).stream())
+ .collect(Collectors.toList())).join()
+ );
+ }
+
+ /**
+ * Partitions the collection into two lists: elements that match the
+ * predicate and elements that don't.
+ *
+ * @return a list of two lists: [matching, non-matching]
+ */
+ public static <T> List<List<T>> splitParallel(Collection<T> self,
Predicate<T> predicate) {
+ return withCurrentFJP(fjp ->
+ fjp.submit(() -> {
+ Map<Boolean, List<T>> parts = self.parallelStream()
+ .collect(Collectors.partitioningBy(predicate));
+ return List.of(parts.get(true), parts.get(false));
+ }).join()
+ );
+ }
+
+ /**
+ * Reduces the collection in parallel with a seed value.
+ * <p>
+ * <b>Note:</b> the accumulator must be associative for correct parallel
+ * results. Non-associative accumulators will produce undefined results.
+ *
+ * @param seed the initial value
+ * @param accumulator an associative reduction function
+ */
+ public static <T> T injectParallel(Collection<T> self, T seed,
BinaryOperator<T> accumulator) {
+ return withCurrentFJP(fjp ->
+ fjp.submit(() -> self.parallelStream().reduce(seed,
accumulator)).join()
+ );
+ }
+
+ /**
+ * Filters elements using Groovy's {@code isCase} pattern matching
+ * in parallel. Supports the same filter types as Groovy's {@code grep}:
+ * Class, regex Pattern, Range, Collection, Closure, etc.
+ *
+ * @param filter the pattern to match against (uses {@code isCase})
+ */
+ public static <T> List<T> grepParallel(Collection<T> self, Object filter) {
+ return withCurrentFJP(fjp ->
+ fjp.submit(() -> self.parallelStream()
+ .filter(e -> InvokerHelper.invokeMethod(filter,
"isCase", e) != Boolean.FALSE)
+ .collect(Collectors.toList())).join()
Review Comment:
ParallelCollectionExtensions uses InvokerHelper in grepParallel but the
class is not imported or fully-qualified, so this file won't compile. Add the
missing import (org.codehaus.groovy.runtime.InvokerHelper) or qualify the
reference.
##########
src/main/java/org/codehaus/groovy/transform/ParallelASTTransformation.java:
##########
@@ -71,42 +80,80 @@ public void visit(final ASTNode[] nodes, final SourceUnit
source) {
return;
}
- injectParallelThreadStart(forStatement, annotation);
+ rewriteForLoop(forStatement, annotation, source);
+ // Re-resolve variable scopes after AST rewrite
+ for (ClassNode classNode : source.getAST().getClasses()) {
+ new VariableScopeVisitor(source).visitClass(classNode);
+ }
}
- private static void injectParallelThreadStart(final ForStatement
forStatement, final AnnotationNode annotation) {
- Statement originalBody = forStatement.getLoopBlock();
+ private static void rewriteForLoop(final ForStatement forLoop, final
AnnotationNode annotation, final SourceUnit source) {
+ Statement originalBody = forLoop.getLoopBlock();
+ Parameter loopParameter = forLoop.getValueVariable();
+ Expression originalCollection = forLoop.getCollectionExpression();
+
+ // Rename references to the loop variable in the body to a synthetic
+ // name, avoiding a clash with the for loop's own parameter
declaration.
+ String originalName = loopParameter.getName();
+ String internalName = "$parallel_" + originalName;
- // Use a closure parameter with the same loop variable name and
curry(currentValue)
- // so each launched thread receives its own iteration value.
- Parameter loopParameter = forStatement.getValueVariable();
- Parameter workerParameter = new
Parameter(loopParameter.getOriginType(), loopParameter.getName());
+ // Rename all VariableExpressions referencing the loop variable
+ new VariableRenamer(source, originalName,
internalName).rename(originalBody);
Review Comment:
VariableRenamer renames every VariableExpression with the loop variable
name, regardless of what it resolves to. This can break code when the loop body
contains a nested closure/lambda that also declares a parameter/local with the
same name (those references would be renamed too). Restrict renaming to
VariableExpressions whose accessedVariable matches the for-loop Parameter (and
preserve source position/accessedVariable when creating the replacement
expression).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]