I have an initial pass at a replacement for one-way async support using a target invoker rather than an interceptor. I have been able to test it in chianti and ported it to the latest trunk. For some reason having to do with SCA SPI test failures, I can't successfully build the man trunk yet so I haven't yet verified the target invoker there. In any case, in the interest of having something go into the trunk sooner rather than later, here's the patch.
Notice that the target invoker should be using a monitor to flag an error during invoke on a separate thread. However, trying to use a monitor I get the build error (in chianti) below. So, I have commented out use of a monitor for now.
 
Running localwire.LocalWireTestCase
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.511 sec <<< FA
ILURE!
testMessage(localwire.LocalWireTestCase)  Time elapsed: 0.501 sec  <<< ERROR!
org.apache.tuscany.spi.builder.BuilderConfigException: No builder registered for
 implementation [org.apache.tuscany.core.implementation.java.JavaImplementation]
 
Context stack trace: [TargetComponent]
        at org.apache.tuscany.core.builder.BuilderRegistryImpl.build(BuilderRegi
stryImpl.java:93)
        at org.apache.tuscany.core.implementation.system.builder.SystemComposite
Builder.build(SystemCompositeBuilder.java:97)
        at org.apache.tuscany.core.builder.BuilderRegistryImpl.build(BuilderRegi
stryImpl.java:99)
        at org.apache.tuscany.core.deployer.DeployerImpl.build(DeployerImpl.java
:125)
        at org.apache.tuscany.core.deployer.DeployerImpl.deploy(DeployerImpl.jav
a:91)
        at org.apache.tuscany.core.launcher.Launcher.bootApplication(Launcher.ja
va:195)
        at org.apache.tuscany.test.SCATestCase.setUp(SCATestCase.java:40)
        at junit.framework.TestCase.runBare(TestCase.java:125)
        at junit.framework.TestResult$1.protect(TestResult.java:106)
        at junit.framework.TestResult.runProtected(TestResult.java:124)
        at junit.framework.TestResult.run(TestResult.java:109)
        at junit.framework.TestCase.run(TestCase.java:118)
        at junit.framework.TestSuite.runTest(TestSuite.java:208)
        at junit.framework.TestSuite.run(TestSuite.java:203)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.
java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces
sorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:585)
        at org.apache.maven.surefire.junit.JUnitTestSet.execute(JUnitTestSet.jav
a:210)
        at org.apache.maven.surefire.suite.AbstractDirectoryTestSuite.executeTes
tSet(AbstractDirectoryTestSuite.java:135)
        at org.apache.maven.surefire.suite.AbstractDirectoryTestSuite.execute(Ab
stractDirectoryTestSuite.java:122)
        at org.apache.maven.surefire.Surefire.run(Surefire.java:129)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.
java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces
sorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:585)
        at org.apache.maven.surefire.booter.SurefireBooter.runSuitesInProcess(Su
refireBooter.java:225)
        at org.apache.maven.surefire.booter.SurefireBooter.main(SurefireBooter.j
ava:747)
 

Results :
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0
Index: 
sca/samples/supplychain/src/test/java/supplychain/SupplyChainClientTestCase.java
===================================================================
--- 
sca/samples/supplychain/src/test/java/supplychain/SupplyChainClientTestCase.java
    (revision 423959)
+++ 
sca/samples/supplychain/src/test/java/supplychain/SupplyChainClientTestCase.java
    (working copy)
@@ -40,6 +40,8 @@
 
         System.out.println("Main thread " + Thread.currentThread());
         customer.purchaseGoods();
+        System.out.println("Sleeping ...");
+        Thread.sleep(1000);
 
     }
 }
Index: 
sca/core/src/main/java/org/apache/tuscany/core/policy/async/AsyncPolicyBuilder.java
===================================================================
--- 
sca/core/src/main/java/org/apache/tuscany/core/policy/async/AsyncPolicyBuilder.java
 (revision 423959)
+++ 
sca/core/src/main/java/org/apache/tuscany/core/policy/async/AsyncPolicyBuilder.java
 (working copy)
@@ -58,7 +58,7 @@
         for (InboundInvocationChain chain : 
wire.getInvocationChains().values()) {
             // TODO fix this - it should be represented by the model and not 
through an annotation
             if (chain.getMethod().getAnnotation(OneWay.class) != null) {
-                chain.addInterceptor(new AsyncInterceptor(workManager, 
monitor));
+                // chain.addInterceptor(new AsyncInterceptor(workManager, 
monitor));
             }
         }
     }
Index: 
sca/core/src/main/java/org/apache/tuscany/core/implementation/java/JavaAtomicComponent.java
===================================================================
--- 
sca/core/src/main/java/org/apache/tuscany/core/implementation/java/JavaAtomicComponent.java
 (revision 423959)
+++ 
sca/core/src/main/java/org/apache/tuscany/core/implementation/java/JavaAtomicComponent.java
 (working copy)
@@ -19,8 +19,10 @@
 import java.lang.reflect.Method;
 
 import org.apache.tuscany.core.implementation.PojoConfiguration;
+import org.apache.tuscany.spi.component.ComponentRuntimeException;
 import org.apache.tuscany.spi.component.TargetException;
 import org.apache.tuscany.spi.component.TargetNotFoundException;
+import org.apache.tuscany.spi.services.work.WorkScheduler;
 import org.apache.tuscany.spi.wire.InboundWire;
 import org.apache.tuscany.spi.wire.TargetInvoker;
 import org.apache.tuscany.spi.wire.OutboundWire;
@@ -28,6 +30,8 @@
 
 import org.apache.tuscany.core.implementation.PojoAtomicComponent;
 import org.apache.tuscany.core.injection.WireObjectFactory;
+import org.apache.tuscany.core.policy.async.AsyncMonitor;
+import org.osoa.sca.annotations.OneWay;
 
 /**
  * The runtime instantiation of Java component implementations
@@ -36,6 +40,9 @@
  */
 public class JavaAtomicComponent<T> extends PojoAtomicComponent<T> {
 
+    private WorkScheduler workScheduler;
+    private AsyncMonitor monitor;
+
     public JavaAtomicComponent(String name, PojoConfiguration configuration) {
         super(name, configuration);
         this.scope = configuration.getScopeContainer().getScope();
@@ -51,6 +58,18 @@
         return wireService.createProxy(wire);
     }
 
+    // REVIEW using setWorkScheduler rather than ctor parameter to avoid
+    //        every use of ctor to be affected
+    public void setWorkScheduler(WorkScheduler workScheduler) {
+        this.workScheduler = workScheduler;
+    }
+    
+    // REVIEW using setMonitor rather than ctor parameter to avoid
+    //        every use of ctor to be affected
+    public void setMonitor(AsyncMonitor monitor) {
+        this.monitor = monitor;
+    }
+
     public T getServiceInstance() throws TargetException {
         if (serviceInterfaces.size() == 0) {
             return getTargetInstance();
@@ -62,7 +81,22 @@
     }
 
     public TargetInvoker createTargetInvoker(String serviceName, Method 
operation) {
-        return new JavaTargetInvoker(operation, this);
+        TargetInvoker targetInvoker = null;
+        if (operation.getAnnotation(OneWay.class) != null) {
+            if (workScheduler == null) {
+                // TODO Make sure appropriate exception is thrown
+                throw new ComponentRuntimeException("Need an instance of 
workScheduler");
+            }
+            if (monitor == null) {
+                // TODO Make sure appropriate exception is thrown
+                // throw new ComponentRuntimeException("Need an instance of 
monitor");
+            }
+            targetInvoker = new AsyncJavaTargetInvoker(operation, this, 
workScheduler, monitor);
+        }
+        else {
+            targetInvoker = new JavaTargetInvoker(operation, this);
+        }
+        return targetInvoker;
     }
 
     protected ObjectFactory<?> createWireFactory(OutboundWire wire) {
Index: 
sca/core/src/main/java/org/apache/tuscany/core/implementation/java/AsyncJavaTargetInvoker.java
===================================================================
--- 
sca/core/src/main/java/org/apache/tuscany/core/implementation/java/AsyncJavaTargetInvoker.java
      (revision 0)
+++ 
sca/core/src/main/java/org/apache/tuscany/core/implementation/java/AsyncJavaTargetInvoker.java
      (revision 0)
@@ -0,0 +1,152 @@
+/**
+ *
+ *  Copyright 2005 The Apache Software Foundation or its licensors, as 
applicable.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.tuscany.core.implementation.java;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.tuscany.core.policy.async.AsyncMonitor;
+import org.apache.tuscany.spi.services.work.WorkScheduler;
+import org.apache.tuscany.spi.wire.InvocationRuntimeException;
+import org.apache.tuscany.spi.wire.Message;
+import org.apache.tuscany.spi.wire.MessageChannel;
+import org.apache.tuscany.spi.wire.TargetInvoker;
+import org.osoa.sca.CompositeContext;
+import org.osoa.sca.CurrentCompositeContext;
+import org.osoa.sca.SCA;
+import org.osoa.sca.ServiceRuntimeException;
+
+/**
+ * 
+ *
+ * @version $Rev: 420514 $ $Date: 2006-07-10 08:53:18 -0400 (Mon, 10 Jul 2006) 
$
+ */
+public class AsyncJavaTargetInvoker extends JavaTargetInvoker {
+
+    private static final ContextBinder BINDER = new ContextBinder();
+    private static final Message RESPONSE = new ImmutableMessage();
+
+    private WorkScheduler workScheduler;
+    private AsyncMonitor monitor;
+    
+    /**
+     * Creates a new invoker
+     *
+     * @param operation the operation the invoker is associated with
+     * @param context   the scope component the component is resolved in
+     * @param workManager used to spawn invocation work
+     */
+    public AsyncJavaTargetInvoker(Method operation, JavaAtomicComponent 
context, WorkScheduler workScheduler, AsyncMonitor monitor) {
+        super(operation, context);
+        this.workScheduler = workScheduler;
+        this.monitor = monitor;
+    }
+
+    // Override invocation methods to defer invocation to work item
+    // Both methods return null to indicate asynchrony; result will
+    // be conveyed by callback
+    @Override
+    public Object invokeTarget(Object payload) throws 
InvocationTargetException {
+        final CompositeContext currentContext = 
CurrentCompositeContext.getContext();
+        final Object finalPayload = payload;
+        // Schedule the invocation of the next interceptor in a new Work 
instance
+        try {
+            workScheduler.scheduleWork(new Runnable() {
+                public void run() {
+                    CompositeContext oldContext = 
CurrentCompositeContext.getContext();
+                    try {
+                        
AsyncJavaTargetInvoker.BINDER.setContext(currentContext);
+                        // REVIEW response must be null for one-way and 
non-null for callback
+                        Object response = 
AsyncJavaTargetInvoker.super.invokeTarget(finalPayload);
+                    } catch (Exception e) {
+                        // REVIEW uncomment when it is available
+                        // monitor.executionError(e);
+                    } finally {
+                        AsyncJavaTargetInvoker.BINDER.setContext(oldContext);
+                    }
+                }
+            });
+        } catch (Exception e) {
+            throw new ServiceRuntimeException(e);
+        }
+        return RESPONSE;
+    }
+
+    public Message invoke(Message msg) throws InvocationRuntimeException {
+        // can't just call overriden invoke because it would bypass async
+        try {
+            Object resp = invokeTarget(msg.getBody());
+            return (Message)resp;
+        } catch (InvocationTargetException e) {
+            return null;
+        } catch (Throwable e) {
+            return null;
+        }
+    }
+    
+    public AsyncJavaTargetInvoker clone() throws CloneNotSupportedException {
+        AsyncJavaTargetInvoker invoker = (AsyncJavaTargetInvoker) 
super.clone();
+        invoker.workScheduler = this.workScheduler;
+        invoker.monitor = this.monitor;
+        
+        return invoker;
+    }
+
+    private static class ContextBinder extends SCA {
+        public void setContext(CompositeContext context) {
+            setCompositeContext(context);
+        }
+
+        public void start() {
+            throw new AssertionError();
+        }
+
+        public void stop() {
+            throw new AssertionError();
+        }
+    }
+
+    /**
+     * A dummy message passed back on an invocation
+     */
+    private static class ImmutableMessage implements Message {
+
+        public Object getBody() {
+            return null;
+        }
+
+        public void setBody(Object body) {
+            throw new UnsupportedOperationException();
+        }
+
+        public void setTargetInvoker(TargetInvoker invoker) {
+            throw new UnsupportedOperationException();
+        }
+
+        public TargetInvoker getTargetInvoker() {
+            return null;
+        }
+
+        public MessageChannel getCallbackChannel() {
+            return null;
+        }
+
+        public Message getRelatedCallbackMessage() {
+            return null;
+        }
+    }
+}
Index: 
sca/core/src/main/java/org/apache/tuscany/core/implementation/java/JavaComponentBuilder.java
===================================================================
--- 
sca/core/src/main/java/org/apache/tuscany/core/implementation/java/JavaComponentBuilder.java
        (revision 423959)
+++ 
sca/core/src/main/java/org/apache/tuscany/core/implementation/java/JavaComponentBuilder.java
        (working copy)
@@ -42,6 +42,7 @@
 import org.apache.tuscany.core.implementation.PojoConfiguration;
 import org.apache.tuscany.core.injection.MethodEventInvoker;
 import org.apache.tuscany.core.injection.PojoObjectFactory;
+import org.apache.tuscany.core.policy.async.AsyncMonitor;
 import org.apache.tuscany.core.wire.InboundInvocationChainImpl;
 import org.apache.tuscany.core.wire.InboundWireImpl;
 import org.apache.tuscany.core.wire.InvokerInterceptor;
@@ -55,6 +56,35 @@
  */
 public class JavaComponentBuilder extends 
ComponentBuilderExtension<JavaImplementation> {
 
+    // private PolicyBuilderRegistry builderRegistry;
+    // private AsyncMonitor monitor;
+
+    // REVIEW this seems to cause build problems
+    /*
+    @Init(eager = true)
+    public void init() {
+        // REVIEW is this needed?
+        // builderRegistry.registerTargetBuilder(INITIAL, this);
+        if (monitor == null) {
+            monitor = new NullMonitorFactory().getMonitor(AsyncMonitor.class);
+        }
+    }
+    */
+
+    /*
+    @Autowire
+    public void setBuilderRegistry(PolicyBuilderRegistry builderRegistry) {
+        this.builderRegistry = builderRegistry;
+    }
+    */
+
+    /*
+    @org.apache.tuscany.spi.annotation.Monitor
+    public void setMonitor(AsyncMonitor monitor) {
+        this.monitor = monitor;
+    }
+    */
+
     @SuppressWarnings("unchecked")
     public AtomicComponent<?> build(CompositeComponent<?> parent,
                                     ComponentDefinition<JavaImplementation> 
definition,
@@ -108,6 +138,9 @@
         
configuration.getConstructorParamNames().addAll(ctorDef.getInjectionNames());
 
         JavaAtomicComponent component = new 
JavaAtomicComponent(definition.getName(), configuration);
+        // REVIEW setting these (as opposed to using ctor parms) may not be 
appropriate
+        component.setWorkScheduler(workScheduler);
+        // component.setMonitor(monitor);
 
         // handle properties
         for (JavaMappedProperty<?> property : 
componentType.getProperties().values()) {

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to